-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improvement](sink) optimization for parallel result sink #36305
Conversation
run buildall |
clang-tidy review says "All clean, LGTM! 👍" |
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
@@ -102,7 +102,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() { | |||
} | |||
|
|||
template <bool is_binary_format> | |||
Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) { | |||
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
^
Additional context
be/src/vec/sink/vmysql_result_writer.cpp:104: 107 lines including whitespace and comments (threshold 80)
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
^
@@ -105,7 +105,7 @@ VIcebergTableWriter::_to_iceberg_partition_columns() { | |||
return partition_columns; | |||
} | |||
|
|||
Status VIcebergTableWriter::write(vectorized::Block& block) { | |||
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' has cognitive complexity of 85 (threshold 50) [readability-function-cognitive-complexity]
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^
Additional context
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:109: +1, including nesting penalty of 0, nesting level increased to 1
if (block.rows() == 0) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:113: +1, including nesting penalty of 0, nesting level increased to 1
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:113: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:120: +1, including nesting penalty of 0, nesting level increased to 1
if (_iceberg_partition_columns.empty()) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:125: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:128: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:132: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:132: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:133: +1, nesting level increased to 2
} else {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:134: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::iceberg_sink_max_file_size) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:145: +4, including nesting penalty of 3, nesting level increased to 4
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:149: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:149: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:150: +1, nesting level increased to 3
} else {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:157: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(writer->write(output_block));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:157: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->write(output_block));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:168: +1, including nesting penalty of 0, nesting level increased to 1
for (int i = 0; i < output_block.rows(); ++i) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:172: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:179: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:183: nesting level increased to 2
[&](const std::string& partition_name, int position,
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:189: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:189: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:195: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:202: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:204: +3, including nesting penalty of 2, nesting level increased to 3
if (_partitions_to_writers.size() + 1 >
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:210: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:210: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:211: +1, nesting level increased to 2
} else {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:213: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::iceberg_sink_max_file_size) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:222: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:222: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:224: +1, nesting level increased to 3
} else {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:228: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_pos_iter == writer_positions.end()) {
^
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:232: +1, nesting level increased to 3
} else {
^
@@ -105,7 +105,7 @@ | |||
return partition_columns; | |||
} | |||
|
|||
Status VIcebergTableWriter::write(vectorized::Block& block) { | |||
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^
Additional context
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:107: 139 lines including whitespace and comments (threshold 80)
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^
@@ -81,7 +81,7 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { | |||
return Status::OK(); | |||
} | |||
|
|||
Status VHiveTableWriter::write(vectorized::Block& block) { | |||
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' has cognitive complexity of 83 (threshold 50) [readability-function-cognitive-complexity]
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^
Additional context
be/src/vec/sink/writer/vhive_table_writer.cpp:86: +1, including nesting penalty of 0, nesting level increased to 1
if (block.rows() == 0) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:90: +1, including nesting penalty of 0, nesting level increased to 1
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:90: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:98: +1, including nesting penalty of 0, nesting level increased to 1
if (_partition_columns_input_index.empty()) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:103: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:106: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:110: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:110: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:111: +1, nesting level increased to 2
} else {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:112: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:123: +4, including nesting penalty of 3, nesting level increased to 4
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:127: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:127: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:128: +1, nesting level increased to 3
} else {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:135: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(writer->write(output_block));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:135: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->write(output_block));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:141: +1, including nesting penalty of 0, nesting level increased to 1
for (int i = 0; i < output_block.rows(); ++i) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:145: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:152: nesting level increased to 2
[&](const std::string& partition_name, int position,
^
be/src/vec/sink/writer/vhive_table_writer.cpp:158: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:158: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:164: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:171: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:173: +3, including nesting penalty of 2, nesting level increased to 3
if (_partitions_to_writers.size() + 1 >
^
be/src/vec/sink/writer/vhive_table_writer.cpp:179: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:179: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:180: +1, nesting level increased to 2
} else {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:182: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:191: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^
be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:191: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^
be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/vec/sink/writer/vhive_table_writer.cpp:193: +1, nesting level increased to 3
} else {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:197: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_pos_iter == writer_positions.end()) {
^
be/src/vec/sink/writer/vhive_table_writer.cpp:201: +1, nesting level increased to 3
} else {
^
@@ -81,7 +81,7 @@ | |||
return Status::OK(); | |||
} | |||
|
|||
Status VHiveTableWriter::write(vectorized::Block& block) { | |||
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^
Additional context
be/src/vec/sink/writer/vhive_table_writer.cpp:83: 132 lines including whitespace and comments (threshold 80)
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^
run buildall |
1 similar comment
run buildall |
dependency->set_ready(); | ||
if (_is_cancelled) { | ||
for (auto it : _result_sink_dependencys) { | ||
it.second->set_ready(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it.second->set_ready(); | |
it.second->set_always_ready(); |
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
run buildall |
run buildall |
run performance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR approved by at least one committer and no changes requested. |
…ache#36305)" This reverts commit fdb5891.
## Proposed changes optimization for parallel result sink apache#36053
## Proposed changes optimization for parallel result sink #36053
Proposed changes
optimization for parallel result sink #36053