diff --git a/src/Processors/Transforms/Streaming/ChangelogTransform.cpp b/src/Processors/Transforms/Streaming/ChangelogTransform.cpp index 6bca9a9e142..a17899a756a 100644 --- a/src/Processors/Transforms/Streaming/ChangelogTransform.cpp +++ b/src/Processors/Transforms/Streaming/ChangelogTransform.cpp @@ -145,30 +145,46 @@ void ChangelogTransform::work() return; } - IColumn::Selector selector(rows, 0); - for (size_t i = 0; auto delta : delta_flags) - selector[i++] = delta > 0; - - /// FIXME: Consider multiple changelogs processing orders - /// Group 0: retract. Group 1: update - std::array chunks; - for (const auto & col : chunk_columns) - { - auto split_cols = col->scatter(2, selector); - assert(split_cols.size() == 2); + /// cut every column in chunk_columns and put them into a new chunk + auto cut_cols_into_chunk = [&chunk_columns, this, &delta_flags](UInt64 & start_pos, UInt64 end_pos) { + Chunk chunk_output; - for (size_t chunk_index = 0; chunk_index < 2; ++chunk_index) - chunks[chunk_index].addColumn(std::move(split_cols[chunk_index])); - } + for (const auto & col : chunk_columns) + chunk_output.addColumn(col->cut(start_pos, end_pos - start_pos)); - if (chunks[0].getNumRows()) + if (delta_flags[start_pos] < 0) + { + /// retract chunk + chunk_output.setRetractedDataFlag(); + this->transformChunk(chunk_output); + } + else + { + /// update chunk + chunk_output.setChunkContext(input_data.chunk.getChunkContext()); + this->transformChunk(chunk_output); + } + }; + + /** + * @brief Put consecutive data with the same _tp_delta value in a chunk. + * For example: if the input chunk delta flags are [1, 1, 1, -1, -1, 1, 1, 1] + * We will split into 3 chunks:[[1, 1, 1], [-1, -1], [1, 1, 1]]. + * + * This not only ensures that the order of data processing is consistent with the input, + * but also ensures that the _tp_delta values in the same chunk are the same. + */ + UInt64 start_pos = 0; + for (size_t end_pos = 0; end_pos < delta_flags.size(); ++end_pos) { - chunks[0].setRetractedDataFlag(); - transformChunk(chunks[0]); + if (delta_flags[end_pos] != delta_flags[start_pos]) + { + cut_cols_into_chunk(start_pos, end_pos); + start_pos = end_pos; + } } - - chunks[1].setChunkContext(input_data.chunk.getChunkContext()); - transformChunk(chunks[1]); + /// handle the last part + cut_cols_into_chunk(start_pos, delta_flags.size()); input_data.chunk.clear(); } @@ -187,5 +203,7 @@ void ChangelogTransform::transformChunk(Chunk & chunk) output_chunks.push_back(std::move(chunk)); } + + } } \ No newline at end of file diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml index 20fc4ab726c..a9b9736e52b 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml @@ -343,3 +343,40 @@ tests: - [120, 210, 3, 5] - [120, 120, 3, 3] - [30, 30, 1, 1] + - id: 108 + tags: + - global aggr distinct + - changelog_kv + - single shard + name: "global aggr on changelog_kv stream with single shard" + description: sum_distinct and count_distinct for changelog_kv stream with single shard. + steps: + - statements: + - client: python + query_type: table + query: drop stream if exists changelog_kv_13 + + - client: python + query_type: table + wait: 2 + query: create stream changelog_kv_13(id int, val int) primary key id settings mode='changelog_kv'; + + - client: python + query_type: stream + wait: 2 + depends_on_stream: changelog_kv_13 + query_id: '13108' + query: select count_distinct(val), sum_distinct(val) from changelog_kv_13; + + - client: python + query_type: table + depends_on: '13108' + wait: 3 + kill: '13108' + kill_wait: 2 + query: insert into changelog_kv_13(id, val, _tp_delta) values(2, 1, 1)(2, 1, -1)(3, 2, 1)(3, 2, -1); + + expected_results: + - query_id: '13108' + expected_results: + - [0, 0] \ No newline at end of file