Skip to content

Commit

Permalink
[opt](mysql serde) Avoid core dump when converting invalid block to m…
Browse files Browse the repository at this point in the history
…ysql result (apache#28069)

BE will core dump if result block is invalid when we doing result serialization.
An existing bug case is described in apache#28030, so we add check branch to avoid BE core dump due to out of range related problem.
  • Loading branch information
zhiqiang-hhhh authored and 胥剑旭 committed Dec 14, 2023
1 parent b379059 commit ae21992
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
12 changes: 8 additions & 4 deletions be/src/vec/data_types/serde/data_type_array_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,22 @@ void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::Ar
template <bool is_binary_format>
Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const {
int row_idx_of_mysql, bool col_const) const {
auto& column_array = assert_cast<const ColumnArray&>(column);
auto& offsets = column_array.get_offsets();
auto& data = column_array.get_data();
bool is_nested_string = data.is_column_string();
const auto col_index = index_check_const(row_idx, col_const);
const auto row_idx_of_col_arr = index_check_const(row_idx_of_mysql, col_const);
result.open_dynamic_mode();

if (0 != result.push_string("[", 1)) {
return Status::InternalError("pack mysql buffer failed.");
}
for (int j = offsets[col_index - 1]; j < offsets[col_index]; ++j) {
if (j != offsets[col_index - 1]) {

const auto begin_arr_element = offsets[row_idx_of_col_arr - 1];
const auto end_arr_element = offsets[row_idx_of_col_arr];
for (int j = begin_arr_element; j < end_arr_element; ++j) {
if (j != begin_arr_element) {
if (0 != result.push_string(", ", 2)) {
return Status::InternalError("pack mysql buffer failed.");
}
Expand Down
40 changes: 28 additions & 12 deletions be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,17 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
return status;
}

DCHECK(_output_vexpr_ctxs.empty() != true);

// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));

// convert one batch
auto result = std::make_unique<TFetchDataResult>();
auto num_rows = block.rows();
result->result_batch.rows.resize(num_rows);

uint64_t bytes_sent = 0;
{
SCOPED_TIMER(_convert_tuple_timer);
Expand All @@ -138,15 +138,19 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
DataTypeSerDeSPtr serde;
};

const size_t num_cols = _output_vexpr_ctxs.size();
std::vector<Arguments> arguments;
for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) {
const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
int scale = _output_vexpr_ctxs[i]->root()->type().scale;
arguments.reserve(num_cols);

for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
const auto& [column_ptr, col_const] =
unpack_if_const(block.get_by_position(col_idx).column);
int scale = _output_vexpr_ctxs[col_idx]->root()->type().scale;
// decimalv2 scale and precision is hard code, so we should get real scale and precision
// from expr
DataTypeSerDeSPtr serde;
if (_output_vexpr_ctxs[i]->root()->type().is_decimal_v2_type()) {
if (_output_vexpr_ctxs[i]->root()->is_nullable()) {
if (_output_vexpr_ctxs[col_idx]->root()->type().is_decimal_v2_type()) {
if (_output_vexpr_ctxs[col_idx]->root()->is_nullable()) {
auto nested_serde =
std::make_shared<DataTypeDecimalSerDe<vectorized::Decimal128>>(scale,
27);
Expand All @@ -156,16 +160,28 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
27);
}
} else {
serde = block.get_by_position(i).type->get_serde();
serde = block.get_by_position(col_idx).type->get_serde();
}
serde->set_return_object_as_string(output_object_data());
arguments.emplace_back(column_ptr.get(), col_const, serde);
}

for (size_t row_idx = 0; row_idx != num_rows; ++row_idx) {
for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) {
RETURN_IF_ERROR(arguments[i].serde->write_column_to_mysql(
*(arguments[i].column), row_buffer, row_idx, arguments[i].is_const));
for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
const auto& argument = arguments[col_idx];
// const column will only have 1 row, see unpack_if_const
if (argument.column->size() < num_rows && !argument.is_const) {
return Status::InternalError(
"Required row size is out of range, need {} rows, column {} has {} "
"rows in fact.",
num_rows, argument.column->get_name(), argument.column->size());
}
}

for (size_t row_idx = 0; row_idx < num_rows; ++row_idx) {
for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql(
*(arguments[col_idx].column), row_buffer, row_idx,
arguments[col_idx].is_const));
}

// copy MysqlRowBuffer to Thrift
Expand Down

0 comments on commit ae21992

Please sign in to comment.