From ae21992932dd63a2195a23f7a6ed2a84e3b696df Mon Sep 17 00:00:00 2001 From: zhiqiang Date: Fri, 8 Dec 2023 10:21:09 +0800 Subject: [PATCH] [opt](mysql serde) Avoid core dump when converting invalid block to mysql result (#28069) BE will core dump if result block is invalid when we doing result serialization. An existing bug case is described in #28030, so we add check branch to avoid BE core dump due to out of range related problem. --- .../serde/data_type_array_serde.cpp | 12 ++++-- be/src/vec/sink/vmysql_result_writer.cpp | 40 +++++++++++++------ 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp index 91dfa8452e2778..7aa5ab78cc7bea 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -299,18 +299,22 @@ void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::Ar template Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column, MysqlRowBuffer& result, - int row_idx, bool col_const) const { + int row_idx_of_mysql, bool col_const) const { auto& column_array = assert_cast(column); auto& offsets = column_array.get_offsets(); auto& data = column_array.get_data(); bool is_nested_string = data.is_column_string(); - const auto col_index = index_check_const(row_idx, col_const); + const auto row_idx_of_col_arr = index_check_const(row_idx_of_mysql, col_const); result.open_dynamic_mode(); + if (0 != result.push_string("[", 1)) { return Status::InternalError("pack mysql buffer failed."); } - for (int j = offsets[col_index - 1]; j < offsets[col_index]; ++j) { - if (j != offsets[col_index - 1]) { + + const auto begin_arr_element = offsets[row_idx_of_col_arr - 1]; + const auto end_arr_element = offsets[row_idx_of_col_arr]; + for (int j = begin_arr_element; j < end_arr_element; ++j) { + if (j != begin_arr_element) { if (0 != result.push_string(", ", 2)) { return Status::InternalError("pack mysql buffer failed."); } diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 867cd7dc1d0468..336f627fcc3a5f 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -112,17 +112,17 @@ Status VMysqlResultWriter::append_block(Block& input_block) { return status; } + DCHECK(_output_vexpr_ctxs.empty() != true); + // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec // failed, just return the error status Block block; RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, input_block, &block)); - // convert one batch auto result = std::make_unique(); auto num_rows = block.rows(); result->result_batch.rows.resize(num_rows); - uint64_t bytes_sent = 0; { SCOPED_TIMER(_convert_tuple_timer); @@ -138,15 +138,19 @@ Status VMysqlResultWriter::append_block(Block& input_block) { DataTypeSerDeSPtr serde; }; + const size_t num_cols = _output_vexpr_ctxs.size(); std::vector arguments; - for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) { - const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column); - int scale = _output_vexpr_ctxs[i]->root()->type().scale; + arguments.reserve(num_cols); + + for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { + const auto& [column_ptr, col_const] = + unpack_if_const(block.get_by_position(col_idx).column); + int scale = _output_vexpr_ctxs[col_idx]->root()->type().scale; // decimalv2 scale and precision is hard code, so we should get real scale and precision // from expr DataTypeSerDeSPtr serde; - if (_output_vexpr_ctxs[i]->root()->type().is_decimal_v2_type()) { - if (_output_vexpr_ctxs[i]->root()->is_nullable()) { + if (_output_vexpr_ctxs[col_idx]->root()->type().is_decimal_v2_type()) { + if (_output_vexpr_ctxs[col_idx]->root()->is_nullable()) { auto nested_serde = std::make_shared>(scale, 27); @@ -156,16 +160,28 @@ Status VMysqlResultWriter::append_block(Block& input_block) { 27); } } else { - serde = block.get_by_position(i).type->get_serde(); + serde = block.get_by_position(col_idx).type->get_serde(); } serde->set_return_object_as_string(output_object_data()); arguments.emplace_back(column_ptr.get(), col_const, serde); } - for (size_t row_idx = 0; row_idx != num_rows; ++row_idx) { - for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) { - RETURN_IF_ERROR(arguments[i].serde->write_column_to_mysql( - *(arguments[i].column), row_buffer, row_idx, arguments[i].is_const)); + for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { + const auto& argument = arguments[col_idx]; + // const column will only have 1 row, see unpack_if_const + if (argument.column->size() < num_rows && !argument.is_const) { + return Status::InternalError( + "Required row size is out of range, need {} rows, column {} has {} " + "rows in fact.", + num_rows, argument.column->get_name(), argument.column->size()); + } + } + + for (size_t row_idx = 0; row_idx < num_rows; ++row_idx) { + for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { + RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql( + *(arguments[col_idx].column), row_buffer, row_idx, + arguments[col_idx].is_const)); } // copy MysqlRowBuffer to Thrift