Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in the C++ impl for the pre-process fil stage #1390

65 changes: 31 additions & 34 deletions morpheus/_lib/src/stages/preprocess_fil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()
input__0.get_memory(),
x->mess_offset),
seq_id_dtype,
{x->mess_count, 3},
{},
{x->mess_count, 3},
{},
0);

// Build the results
Expand All @@ -134,8 +134,12 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()

output.on_next(std::move(next));
},
[&](std::exception_ptr error_ptr) { output.on_error(error_ptr); },
[&]() { output.on_completed(); }));
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}

Expand All @@ -144,50 +148,43 @@ TableInfo PreprocessFILStage::fix_bad_columns(sink_type_t x)
std::vector<std::string> bad_cols;

{
// TODO(MDD): Add some sort of lock here to prevent fixing columns after they have been accessed
auto df_meta = x->get_meta(m_fea_cols);
auto df_meta_col_names = df_meta.get_column_names();
// Get the mutable info for the entire meta object so we only do this once per dataframe
auto mutable_info = x->meta->get_mutable_info();
auto df_meta_col_names = mutable_info.get_column_names();

for (size_t i = 0; i < df_meta.num_columns(); ++i)
for (size_t i = 0; i < mutable_info.num_columns(); ++i)
{
if (df_meta.get_column(i).type().id() == cudf::type_id::STRING)
if (mutable_info.get_column(i).type().id() == cudf::type_id::STRING)
{
bad_cols.push_back(df_meta_col_names[i]);
}
}

// Exit early if there is nothing to do
if (bad_cols.empty())
if (!bad_cols.empty())
{
return df_meta;
}
}
// Need to ensure all string columns have been converted to numbers. This requires running a
// regex which is too difficult to do from C++ at this time. So grab the GIL, make the
// conversions, and release. This is horribly inefficient, but so is the JSON lines format for
// this workflow
using namespace pybind11::literals;
pybind11::gil_scoped_acquire gil;

// Need to ensure all string columns have been converted to numbers. This requires running a
// regex which is too difficult to do from C++ at this time. So grab the GIL, make the
// conversions, and release. This is horribly inefficient, but so is the JSON lines format for
// this workflow
{
// Get the mutable info for the entire meta object so we only do this once per dataframe
auto mutable_info = x->meta->get_mutable_info();
// pybind11::object df = x->meta->get_py_table();
auto df = mutable_info.checkout_obj();

using namespace pybind11::literals;
pybind11::gil_scoped_acquire gil;
std::string regex = R"((\d+))";

// pybind11::object df = x->meta->get_py_table();
auto df = mutable_info.checkout_obj();

std::string regex = R"((\d+))";
for (auto c : bad_cols)
{
df[pybind11::str(c)] = df[pybind11::str(c)]
.attr("str")
.attr("extract")(pybind11::str(regex), "expand"_a = true)
.attr("astype")(pybind11::str("float32"));
}

for (auto c : bad_cols)
{
df[pybind11::str(c)] = df[pybind11::str(c)]
.attr("str")
.attr("extract")(pybind11::str(regex), "expand"_a = true)
.attr("astype")(pybind11::str("float32"));
mutable_info.return_obj(std::move(df));
}

mutable_info.return_obj(std::move(df));
}

// Now re-get the meta
Expand Down
Loading