Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](mysql serde) Avoid core dump when converting invalid block to mysql result #28069

Merged
merged 7 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions be/src/vec/data_types/serde/data_type_array_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,22 @@ void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::Ar
template <bool is_binary_format>
Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const {
int row_idx_of_mysql, bool col_const) const {
auto& column_array = assert_cast<const ColumnArray&>(column);
auto& offsets = column_array.get_offsets();
auto& data = column_array.get_data();
bool is_nested_string = data.is_column_string();
const auto col_index = index_check_const(row_idx, col_const);
const auto row_idx_of_col_arr = index_check_const(row_idx_of_mysql, col_const);
result.open_dynamic_mode();

if (0 != result.push_string("[", 1)) {
return Status::InternalError("pack mysql buffer failed.");
}
for (int j = offsets[col_index - 1]; j < offsets[col_index]; ++j) {
if (j != offsets[col_index - 1]) {

const auto begin_arr_element = offsets[row_idx_of_col_arr - 1];
const auto end_arr_element = offsets[row_idx_of_col_arr];
for (int j = begin_arr_element; j < end_arr_element; ++j) {
if (j != begin_arr_element) {
if (0 != result.push_string(", ", 2)) {
return Status::InternalError("pack mysql buffer failed.");
}
Expand Down
40 changes: 28 additions & 12 deletions be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,17 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
return status;
}

DCHECK(_output_vexpr_ctxs.empty() != true);

// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));

// convert one batch
auto result = std::make_unique<TFetchDataResult>();
auto num_rows = block.rows();
result->result_batch.rows.resize(num_rows);

uint64_t bytes_sent = 0;
{
SCOPED_TIMER(_convert_tuple_timer);
Expand All @@ -138,15 +138,19 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
DataTypeSerDeSPtr serde;
};

const size_t num_cols = _output_vexpr_ctxs.size();
std::vector<Arguments> arguments;
for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) {
const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
int scale = _output_vexpr_ctxs[i]->root()->type().scale;
arguments.reserve(num_cols);

for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
const auto& [column_ptr, col_const] =
unpack_if_const(block.get_by_position(col_idx).column);
int scale = _output_vexpr_ctxs[col_idx]->root()->type().scale;
// decimalv2 scale and precision is hard code, so we should get real scale and precision
// from expr
DataTypeSerDeSPtr serde;
if (_output_vexpr_ctxs[i]->root()->type().is_decimal_v2_type()) {
if (_output_vexpr_ctxs[i]->root()->is_nullable()) {
if (_output_vexpr_ctxs[col_idx]->root()->type().is_decimal_v2_type()) {
if (_output_vexpr_ctxs[col_idx]->root()->is_nullable()) {
auto nested_serde =
std::make_shared<DataTypeDecimalSerDe<vectorized::Decimal128>>(scale,
27);
Expand All @@ -156,16 +160,28 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
27);
}
} else {
serde = block.get_by_position(i).type->get_serde();
serde = block.get_by_position(col_idx).type->get_serde();
}
serde->set_return_object_as_string(output_object_data());
arguments.emplace_back(column_ptr.get(), col_const, serde);
}

for (size_t row_idx = 0; row_idx != num_rows; ++row_idx) {
for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) {
RETURN_IF_ERROR(arguments[i].serde->write_column_to_mysql(
*(arguments[i].column), row_buffer, row_idx, arguments[i].is_const));
for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
const auto& argument = arguments[col_idx];
// const column will only have 1 row, see unpack_if_const
if (argument.column->size() < num_rows && !argument.is_const) {
return Status::InternalError(
"Required row size is out of range, need {} rows, column {} has {} "
"rows in fact.",
num_rows, argument.column->get_name(), argument.column->size());
}
}

for (size_t row_idx = 0; row_idx < num_rows; ++row_idx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don’t check every row

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My fault.
Check loop is divided from for loop and placed in the front.

for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql(
*(arguments[col_idx].column), row_buffer, row_idx,
arguments[col_idx].is_const));
}

// copy MysqlRowBuffer to Thrift
Expand Down
Loading