Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement](sink) optimization for parallel result sink #36305

Merged
merged 6 commits into from
Jun 19, 2024

Conversation

BiteTheDDDDt
Copy link
Contributor

@BiteTheDDDDt BiteTheDDDDt commented Jun 14, 2024

Proposed changes

optimization for parallel result sink #36053

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

Copy link
Contributor

clang-tidy review says "All clean, LGTM! 👍"

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-tidy made some suggestions

@@ -102,7 +102,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) {
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]

Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
                                             ^
Additional context

be/src/vec/sink/vmysql_result_writer.cpp:104: 107 lines including whitespace and comments (threshold 80)

Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
                                             ^

@@ -105,7 +105,7 @@ VIcebergTableWriter::_to_iceberg_partition_columns() {
return partition_columns;
}

Status VIcebergTableWriter::write(vectorized::Block& block) {
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'write' has cognitive complexity of 85 (threshold 50) [readability-function-cognitive-complexity]

Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
                            ^
Additional context

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:109: +1, including nesting penalty of 0, nesting level increased to 1

    if (block.rows() == 0) {
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:113: +1, including nesting penalty of 0, nesting level increased to 1

    RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
    ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:113: +2, including nesting penalty of 1, nesting level increased to 2

    RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
    ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:120: +1, including nesting penalty of 0, nesting level increased to 1

    if (_iceberg_partition_columns.empty()) {
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:125: +2, including nesting penalty of 1, nesting level increased to 2

            if (writer_iter == _partitions_to_writers.end()) {
            ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:128: +3, including nesting penalty of 2, nesting level increased to 3

                } catch (doris::Exception& e) {
                  ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:132: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(writer->open(_state, _profile));
                ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:132: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(writer->open(_state, _profile));
                ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:133: +1, nesting level increased to 2

            } else {
              ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:134: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_iter->second->written_len() > config::iceberg_sink_max_file_size) {
                ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:145: +4, including nesting penalty of 3, nesting level increased to 4

                    } catch (doris::Exception& e) {
                      ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:149: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:149: +5, including nesting penalty of 4, nesting level increased to 5

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:150: +1, nesting level increased to 3

                } else {
                  ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:157: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(writer->write(output_block));
        ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:157: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(writer->write(output_block));
        ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:168: +1, including nesting penalty of 0, nesting level increased to 1

        for (int i = 0; i < output_block.rows(); ++i) {
        ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:172: +2, including nesting penalty of 1, nesting level increased to 2

            } catch (doris::Exception& e) {
              ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:179: +2, including nesting penalty of 1, nesting level increased to 2

            } catch (doris::Exception& e) {
              ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:183: nesting level increased to 2

                    [&](const std::string& partition_name, int position,
                    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:189: +3, including nesting penalty of 2, nesting level increased to 3

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:189: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:195: +3, including nesting penalty of 2, nesting level increased to 3

                } catch (doris::Exception& e) {
                  ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:202: +2, including nesting penalty of 1, nesting level increased to 2

            if (writer_iter == _partitions_to_writers.end()) {
            ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:204: +3, including nesting penalty of 2, nesting level increased to 3

                if (_partitions_to_writers.size() + 1 >
                ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:210: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
                ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:210: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
                ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:211: +1, nesting level increased to 2

            } else {
              ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:213: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_iter->second->written_len() > config::iceberg_sink_max_file_size) {
                ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:222: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
                    ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:222: +5, including nesting penalty of 4, nesting level increased to 5

                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
                    ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:224: +1, nesting level increased to 3

                } else {
                  ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:228: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_pos_iter == writer_positions.end()) {
                ^

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:232: +1, nesting level increased to 3

                } else {
                  ^

@@ -105,7 +105,7 @@
return partition_columns;
}

Status VIcebergTableWriter::write(vectorized::Block& block) {
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]

Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
                            ^
Additional context

be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:107: 139 lines including whitespace and comments (threshold 80)

Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
                            ^

@@ -81,7 +81,7 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
return Status::OK();
}

Status VHiveTableWriter::write(vectorized::Block& block) {
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'write' has cognitive complexity of 83 (threshold 50) [readability-function-cognitive-complexity]

Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
                         ^
Additional context

be/src/vec/sink/writer/vhive_table_writer.cpp:86: +1, including nesting penalty of 0, nesting level increased to 1

    if (block.rows() == 0) {
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:90: +1, including nesting penalty of 0, nesting level increased to 1

    RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
    ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:90: +2, including nesting penalty of 1, nesting level increased to 2

    RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
    ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:98: +1, including nesting penalty of 0, nesting level increased to 1

    if (_partition_columns_input_index.empty()) {
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:103: +2, including nesting penalty of 1, nesting level increased to 2

            if (writer_iter == _partitions_to_writers.end()) {
            ^

be/src/vec/sink/writer/vhive_table_writer.cpp:106: +3, including nesting penalty of 2, nesting level increased to 3

                } catch (doris::Exception& e) {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:110: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(writer->open(_state, _profile));
                ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:110: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(writer->open(_state, _profile));
                ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:111: +1, nesting level increased to 2

            } else {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:112: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:123: +4, including nesting penalty of 3, nesting level increased to 4

                    } catch (doris::Exception& e) {
                      ^

be/src/vec/sink/writer/vhive_table_writer.cpp:127: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:127: +5, including nesting penalty of 4, nesting level increased to 5

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:128: +1, nesting level increased to 3

                } else {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:135: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(writer->write(output_block));
        ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:135: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(writer->write(output_block));
        ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:141: +1, including nesting penalty of 0, nesting level increased to 1

        for (int i = 0; i < output_block.rows(); ++i) {
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:145: +2, including nesting penalty of 1, nesting level increased to 2

            } catch (doris::Exception& e) {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:152: nesting level increased to 2

                    [&](const std::string& partition_name, int position,
                    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:158: +3, including nesting penalty of 2, nesting level increased to 3

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:158: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:164: +3, including nesting penalty of 2, nesting level increased to 3

                } catch (doris::Exception& e) {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:171: +2, including nesting penalty of 1, nesting level increased to 2

            if (writer_iter == _partitions_to_writers.end()) {
            ^

be/src/vec/sink/writer/vhive_table_writer.cpp:173: +3, including nesting penalty of 2, nesting level increased to 3

                if (_partitions_to_writers.size() + 1 >
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:179: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
                ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:179: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
                ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:180: +1, nesting level increased to 2

            } else {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:182: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:191: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
                    ^

be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:191: +5, including nesting penalty of 4, nesting level increased to 5

                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
                    ^

be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:193: +1, nesting level increased to 3

                } else {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:197: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_pos_iter == writer_positions.end()) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:201: +1, nesting level increased to 3

                } else {
                  ^

@@ -81,7 +81,7 @@
return Status::OK();
}

Status VHiveTableWriter::write(vectorized::Block& block) {
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]

Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
                         ^
Additional context

be/src/vec/sink/writer/vhive_table_writer.cpp:83: 132 lines including whitespace and comments (threshold 80)

Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
                         ^

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

1 similar comment
@BiteTheDDDDt
Copy link
Contributor Author

run buildall

dependency->set_ready();
if (_is_cancelled) {
for (auto it : _result_sink_dependencys) {
it.second->set_ready();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
it.second->set_ready();
it.second->set_always_ready();

Gabriel39
Gabriel39 previously approved these changes Jun 18, 2024
@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Jun 18, 2024
Copy link
Contributor

PR approved by at least one committer and no changes requested.

Copy link
Contributor

PR approved by anyone and no changes requested.

HappenLee
HappenLee previously approved these changes Jun 19, 2024
Copy link
Contributor

@HappenLee HappenLee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@github-actions github-actions bot removed the approved Indicates a PR has been approved by one committer. label Jun 19, 2024
@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@BiteTheDDDDt
Copy link
Contributor Author

run performance

Copy link
Contributor

@HappenLee HappenLee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Jun 19, 2024
Copy link
Contributor

PR approved by at least one committer and no changes requested.

@BiteTheDDDDt BiteTheDDDDt merged commit fdb5891 into apache:master Jun 19, 2024
25 of 29 checks passed
BiteTheDDDDt added a commit to BiteTheDDDDt/incubator-doris that referenced this pull request Jun 20, 2024
BiteTheDDDDt added a commit to BiteTheDDDDt/incubator-doris that referenced this pull request Jun 21, 2024
## Proposed changes
optimization for parallel result sink apache#36053
dataroaring pushed a commit that referenced this pull request Jun 21, 2024
## Proposed changes
optimization for parallel result sink #36053
BiteTheDDDDt added a commit that referenced this pull request Jun 24, 2024
## Proposed changes
#36305 #36628 reverted coz some bug about local exchange, this pr do not
change local exchange now
dataroaring pushed a commit that referenced this pull request Jun 26, 2024
## Proposed changes
#36305 #36628 reverted coz some bug about local exchange, this pr do not
change local exchange now
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by one committer. dev/3.0.0-merged reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants