Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in the C++ impl for the pre-process fil stage #1390

2 changes: 1 addition & 1 deletion examples/abp_nvsmi_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ From the Morpheus repo root directory, run:
export MORPHEUS_ROOT=$(pwd)
# Launch Morpheus printing debug messages
morpheus --log_level=DEBUG \
`# Run a pipeline with 8 threads and a model batch size of 32 (Must be equal or less than Triton config)` \
`# Run a pipeline with 8 threads and a model batch size of 1024 (Must be equal or less than Triton config)` \
run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=1024 \
`# Specify a NLP pipeline with 256 sequence length (Must match Triton config)` \
pipeline-fil --columns_file=${MORPHEUS_ROOT}/morpheus/data/columns_fil.txt \
Expand Down
86 changes: 48 additions & 38 deletions morpheus/_lib/src/stages/preprocess_fil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()
}
}

// Need to do a transpose here
// Need to convert from row major to column major
// Easiest way to do this is to transpose the data from [fea_len, row_count] to [row_count, fea_len]
auto transposed_data =
MatxUtil::transpose(DevMemInfo{packed_data,
TypeId::FLOAT32,
{x->mess_count, static_cast<TensorIndex>(m_fea_cols.size())},
{1, x->mess_count}});
{static_cast<TensorIndex>(m_fea_cols.size()), x->mess_count},
{x->mess_count, 1}});

// Create the tensor which will be row-major and size [row_count, fea_len]
auto input__0 = Tensor::create(transposed_data,
DType::create<float>(),
{x->mess_count, static_cast<TensorIndex>(m_fea_cols.size())},
Expand All @@ -121,8 +123,8 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()
input__0.get_memory(),
x->mess_offset),
seq_id_dtype,
{x->mess_count, 3},
{},
{x->mess_count, 3},
{},
0);

// Build the results
Expand All @@ -134,8 +136,12 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()

output.on_next(std::move(next));
},
[&](std::exception_ptr error_ptr) { output.on_error(error_ptr); },
[&]() { output.on_completed(); }));
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}

Expand All @@ -144,50 +150,54 @@ TableInfo PreprocessFILStage::fix_bad_columns(sink_type_t x)
std::vector<std::string> bad_cols;

{
// TODO(MDD): Add some sort of lock here to prevent fixing columns after they have been accessed
auto df_meta = x->get_meta(m_fea_cols);
auto df_meta_col_names = df_meta.get_column_names();
// Get the mutable info for the entire meta object so we only do this once per dataframe
auto mutable_info = x->meta->get_mutable_info();
auto df_meta_col_names = mutable_info.get_column_names();

for (size_t i = 0; i < df_meta.num_columns(); ++i)
// Only check the feature columns. Leave the rest unchanged
for (auto& fea_col : m_fea_cols)
{
if (df_meta.get_column(i).type().id() == cudf::type_id::STRING)
// Find the index of the column in the dataframe
auto col_idx =
std::find(df_meta_col_names.begin(), df_meta_col_names.end(), fea_col) - df_meta_col_names.begin();

if (col_idx == df_meta_col_names.size())
{
// This feature was not found. Ignore it.
continue;
}

if (mutable_info.get_column(col_idx).type().id() == cudf::type_id::STRING)
{
bad_cols.push_back(df_meta_col_names[i]);
bad_cols.push_back(fea_col);
}
}

// Exit early if there is nothing to do
if (bad_cols.empty())
if (!bad_cols.empty())
{
return df_meta;
}
}
// Need to ensure all string columns have been converted to numbers. This requires running a
// regex which is too difficult to do from C++ at this time. So grab the GIL, make the
// conversions, and release. This is horribly inefficient, but so is the JSON lines format for
// this workflow
using namespace pybind11::literals;
pybind11::gil_scoped_acquire gil;

// Need to ensure all string columns have been converted to numbers. This requires running a
// regex which is too difficult to do from C++ at this time. So grab the GIL, make the
// conversions, and release. This is horribly inefficient, but so is the JSON lines format for
// this workflow
{
// Get the mutable info for the entire meta object so we only do this once per dataframe
auto mutable_info = x->meta->get_mutable_info();
// pybind11::object df = x->meta->get_py_table();
auto df = mutable_info.checkout_obj();

using namespace pybind11::literals;
pybind11::gil_scoped_acquire gil;
std::string regex = R"((\d+))";

// pybind11::object df = x->meta->get_py_table();
auto df = mutable_info.checkout_obj();

std::string regex = R"((\d+))";
for (auto c : bad_cols)
{
df[pybind11::str(c)] = df[pybind11::str(c)]
.attr("str")
.attr("extract")(pybind11::str(regex), "expand"_a = true)
.attr("astype")(pybind11::str("float32"));
}

for (auto c : bad_cols)
{
df[pybind11::str(c)] = df[pybind11::str(c)]
.attr("str")
.attr("extract")(pybind11::str(regex), "expand"_a = true)
.attr("astype")(pybind11::str("float32"));
mutable_info.return_obj(std::move(df));
}

mutable_info.return_obj(std::move(df));
}

// Now re-get the meta
Expand Down