Skip to content

Commit

Permalink
Check out a mutable table info to fix a race condition where we poten…
Browse files Browse the repository at this point in the history
…tially attempt to fix a column multiple times per df
  • Loading branch information
dagardner-nv committed Nov 28, 2023
1 parent 5f00e78 commit 142ac8a
Showing 1 changed file with 31 additions and 34 deletions.
65 changes: 31 additions & 34 deletions morpheus/_lib/src/stages/preprocess_fil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()
input__0.get_memory(),
x->mess_offset),
seq_id_dtype,
{x->mess_count, 3},
{},
{x->mess_count, 3},
{},
0);

// Build the results
Expand All @@ -134,8 +134,12 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator()

output.on_next(std::move(next));
},
[&](std::exception_ptr error_ptr) { output.on_error(error_ptr); },
[&]() { output.on_completed(); }));
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}

Expand All @@ -144,50 +148,43 @@ TableInfo PreprocessFILStage::fix_bad_columns(sink_type_t x)
std::vector<std::string> bad_cols;

{
// TODO(MDD): Add some sort of lock here to prevent fixing columns after they have been accessed
auto df_meta = x->get_meta(m_fea_cols);
auto df_meta_col_names = df_meta.get_column_names();
// Get the mutable info for the entire meta object so we only do this once per dataframe
auto mutable_info = x->meta->get_mutable_info();
auto df_meta_col_names = mutable_info.get_column_names();

for (size_t i = 0; i < df_meta.num_columns(); ++i)
for (size_t i = 0; i < mutable_info.num_columns(); ++i)
{
if (df_meta.get_column(i).type().id() == cudf::type_id::STRING)
if (mutable_info.get_column(i).type().id() == cudf::type_id::STRING)
{
bad_cols.push_back(df_meta_col_names[i]);
}
}

// Exit early if there is nothing to do
if (bad_cols.empty())
if (!bad_cols.empty())
{
return df_meta;
}
}
// Need to ensure all string columns have been converted to numbers. This requires running a
// regex which is too difficult to do from C++ at this time. So grab the GIL, make the
// conversions, and release. This is horribly inefficient, but so is the JSON lines format for
// this workflow
using namespace pybind11::literals;
pybind11::gil_scoped_acquire gil;

// Need to ensure all string columns have been converted to numbers. This requires running a
// regex which is too difficult to do from C++ at this time. So grab the GIL, make the
// conversions, and release. This is horribly inefficient, but so is the JSON lines format for
// this workflow
{
// Get the mutable info for the entire meta object so we only do this once per dataframe
auto mutable_info = x->meta->get_mutable_info();
// pybind11::object df = x->meta->get_py_table();
auto df = mutable_info.checkout_obj();

using namespace pybind11::literals;
pybind11::gil_scoped_acquire gil;
std::string regex = R"((\d+))";

// pybind11::object df = x->meta->get_py_table();
auto df = mutable_info.checkout_obj();

std::string regex = R"((\d+))";
for (auto c : bad_cols)
{
df[pybind11::str(c)] = df[pybind11::str(c)]
.attr("str")
.attr("extract")(pybind11::str(regex), "expand"_a = true)
.attr("astype")(pybind11::str("float32"));
}

for (auto c : bad_cols)
{
df[pybind11::str(c)] = df[pybind11::str(c)]
.attr("str")
.attr("extract")(pybind11::str(regex), "expand"_a = true)
.attr("astype")(pybind11::str("float32"));
mutable_info.return_obj(std::move(df));
}

mutable_info.return_obj(std::move(df));
}

// Now re-get the meta
Expand Down

0 comments on commit 142ac8a

Please sign in to comment.