Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change the data processing order in changelog #449

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions src/Processors/Transforms/Streaming/ChangelogTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,30 +145,46 @@ void ChangelogTransform::work()
return;
}

IColumn::Selector selector(rows, 0);
for (size_t i = 0; auto delta : delta_flags)
selector[i++] = delta > 0;

/// FIXME: Consider multiple changelogs processing orders
/// Group 0: retract. Group 1: update
std::array<Chunk, 2> chunks;
for (const auto & col : chunk_columns)
{
auto split_cols = col->scatter(2, selector);
assert(split_cols.size() == 2);
/// cut every column in chunk_columns and put them into a new chunk
auto cut_cols_into_chunk = [&chunk_columns, this, &delta_flags](UInt64 & start_pos, UInt64 end_pos) {
Chunk chunk_output;

for (size_t chunk_index = 0; chunk_index < 2; ++chunk_index)
chunks[chunk_index].addColumn(std::move(split_cols[chunk_index]));
}
for (const auto & col : chunk_columns)
chunk_output.addColumn(col->cut(start_pos, end_pos - start_pos));

if (chunks[0].getNumRows())
if (delta_flags[start_pos] < 0)
{
/// retract chunk
chunk_output.setRetractedDataFlag();
this->transformChunk(chunk_output);
}
else
{
/// update chunk
chunk_output.setChunkContext(input_data.chunk.getChunkContext());
this->transformChunk(chunk_output);
}
};

/**
* @brief Put consecutive data with the same _tp_delta value in a chunk.
* For example: if the input chunk delta flags are [1, 1, 1, -1, -1, 1, 1, 1]
* We will split into 3 chunks:[[1, 1, 1], [-1, -1], [1, 1, 1]].
*
* This not only ensures that the order of data processing is consistent with the input,
* but also ensures that the _tp_delta values in the same chunk are the same.
*/
UInt64 start_pos = 0;
for (size_t end_pos = 0; end_pos < delta_flags.size(); ++end_pos)
{
chunks[0].setRetractedDataFlag();
transformChunk(chunks[0]);
if (delta_flags[end_pos] != delta_flags[start_pos])
{
cut_cols_into_chunk(start_pos, end_pos);
start_pos = end_pos;
}
}

chunks[1].setChunkContext(input_data.chunk.getChunkContext());
transformChunk(chunks[1]);
/// handle the last part
cut_cols_into_chunk(start_pos, delta_flags.size());

input_data.chunk.clear();
}
Expand All @@ -187,5 +203,7 @@ void ChangelogTransform::transformChunk(Chunk & chunk)

output_chunks.push_back(std::move(chunk));
}


}
}
37 changes: 37 additions & 0 deletions tests/stream/test_stream_smoke/0013_changelog_stream13.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,40 @@ tests:
- [120, 210, 3, 5]
- [120, 120, 3, 3]
- [30, 30, 1, 1]
- id: 108
tags:
- global aggr distinct
- changelog_kv
- single shard
name: "global aggr on changelog_kv stream with single shard"
description: sum_distinct and count_distinct for changelog_kv stream with single shard.
steps:
- statements:
- client: python
query_type: table
query: drop stream if exists changelog_kv_13

- client: python
query_type: table
wait: 2
query: create stream changelog_kv_13(id int, val int) primary key id settings mode='changelog_kv';

- client: python
query_type: stream
wait: 2
depends_on_stream: changelog_kv_13
query_id: '13108'
query: select count_distinct(val), sum_distinct(val) from changelog_kv_13;

- client: python
query_type: table
depends_on: '13108'
wait: 3
kill: '13108'
kill_wait: 2
query: insert into changelog_kv_13(id, val, _tp_delta) values(2, 1, 1)(2, 1, -1)(3, 2, 1)(3, 2, -1);

expected_results:
- query_id: '13108'
expected_results:
- [0, 0]
Loading