Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw exception when using emit changelog without _tp_delta column #466

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/Interpreters/Streaming/ChangelogQueryVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,14 @@ void ChangelogQueryVisitorMatcher::addDeltaColumn(ASTSelectQuery & select_query,
}
}

/// Need add delta if _tp_delta is not present and the @p select_query is a subquery or an `EMIT CHANGELOG` query
if (!found_delta_col && (is_subquery || query_info.force_emit_changelog))
select_expression_list->children.emplace_back(std::make_shared<ASTIdentifier>(ProtonConsts::RESERVED_DELTA_FLAG));
if (!found_delta_col)
{
if (is_subquery)
/// Need add delta if _tp_delta is not present and the @p select_query is a subquery
select_expression_list->children.emplace_back(std::make_shared<ASTIdentifier>(ProtonConsts::RESERVED_DELTA_FLAG));
else if (query_info.force_emit_changelog)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "The query with emit changelog explicitly requires a `_tp_delta` in select list");
}

if (add_new_required_result_columns)
new_required_result_column_names.push_back(ProtonConsts::RESERVED_DELTA_FLAG);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void StorageView::read(
Streaming::rewriteAsChangelogQuery(current_inner_query->as<ASTSelectWithUnionQuery &>());
/// proton: ends.

auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, false, query_info.settings_limit_offset_done);
auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done);
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, options, column_names);
interpreter.addStorageLimits(*query_info.storage_limits);
interpreter.buildQueryPlan(query_plan);
Expand Down
6 changes: 4 additions & 2 deletions tests/stream/test_stream_smoke/0013_changelog_stream11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ tests:
create materialized view if not exists test14_view1_11 into test14_target_multishard_changelog_kv_11 as
(select i,
k1,
k2
k2,
_tp_delta
from
(select i,
k1,
Expand Down Expand Up @@ -1257,7 +1258,8 @@ tests:
create materialized view if not exists test14_view1_11 into test14_target_changelog_kv_multishard_11 as
(select i,
k1,
k2
k2,
_tp_delta
from
(select i,
k1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tests:
wait: 2
depends_on_stream: v_12181627
query_id: '12181627214'
query: subscribe to select sum_distinct(val), sum(val), count_distinct(val), count(val)
query: subscribe to select sum_distinct(val), sum(val), count_distinct(val), count(val), _tp_delta
from v_12181627 emit changelog settings checkpoint_interval=1;
- client: python
query_type: table
Expand Down
2 changes: 1 addition & 1 deletion tests/stream/test_stream_smoke/0013_changelog_stream8.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ tests:
- client: python
query_type: table
wait: 2
query: create materialized view if not exists test14_view2_8 into test14_target_changelog_kv_8 as (select i, k1, k2 from test14_view1_8 emit changelog);
query: create materialized view if not exists test14_view2_8 into test14_target_changelog_kv_8 as (select i, k1, k2, _tp_delta from test14_view1_8 emit changelog);

- client: python
query_id: '1450'
Expand Down
18 changes: 9 additions & 9 deletions tests/stream/test_stream_smoke/0029_emit_changelog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ tests:
depends_on_stream: test30_stream
wait: 1
query: |
select count() from test30_stream emit changelog periodic 1s;
select count(), _tp_delta from test30_stream emit changelog periodic 1s;

- client: python
query_type: table
Expand Down Expand Up @@ -96,7 +96,7 @@ tests:
depends_on_stream: test30_vk_stream
wait: 1
query: |
select k, count() from test30_vk_stream group by k emit changelog periodic 1s;
select k, count(), _tp_delta from test30_vk_stream group by k emit changelog periodic 1s;

- client: python
query_type: table
Expand Down Expand Up @@ -148,7 +148,7 @@ tests:
depends_on_stream: test30_changelog_kv_stream
wait: 1
query: |
select k, count() from test30_changelog_kv_stream group by k emit changelog periodic 1s;
select k, count(), _tp_delta from test30_changelog_kv_stream group by k emit changelog periodic 1s;

- client: python
query_type: table
Expand Down Expand Up @@ -200,7 +200,7 @@ tests:
depends_on_stream: test30_stream
wait: 1
query: |
select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s;
select k, count(), _tp_delta from changelog(test30_stream, i) group by k emit changelog periodic 1s;

- client: python
query_type: table
Expand Down Expand Up @@ -259,14 +259,14 @@ tests:
query_id: 3001
depends_on_stream: test30_stream
query: |
select count() from (select k, count() from changelog(test30_stream, i) group by k emit periodic 1s) emit changelog periodic 1s;
select count(), _tp_delta from (select k, count() from changelog(test30_stream, i) group by k emit periodic 1s) emit changelog periodic 1s;

- client: python
query_type: stream
query_id: 3002
depends_on_stream: test30_stream
query: |
select count() from (select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s) emit changelog periodic 1s;
select count(), _tp_delta from (select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s) emit changelog periodic 1s;

- client: python
query_type: table
Expand Down Expand Up @@ -339,13 +339,13 @@ tests:
depends_on_stream: test30_right_vk_stream
wait: 1
query: |
select i, count(), max(test30_right_vk_stream.k) from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s;
select i, count(), max(test30_right_vk_stream.k), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s;

- client: python
query_type: stream
query_id: 3001
query: |
select test30_right_vk_stream.k, count(), max(i) from test30_left_stream join test30_right_vk_stream using(i) group by test30_right_vk_stream.k emit changelog periodic 1s;
select test30_right_vk_stream.k, count(), max(i), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by test30_right_vk_stream.k emit changelog periodic 1s;

- client: python
query_type: table
Expand Down Expand Up @@ -428,7 +428,7 @@ tests:
depends_on_stream: test30_right_vk_stream
wait: 1
query: |
subscribe to select i, count(), max(test30_right_vk_stream.k) from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s settings checkpoint_interval=1;
subscribe to select i, count(), max(test30_right_vk_stream.k), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s settings checkpoint_interval=1;

- client: python
query_type: table
Expand Down
Loading