From 11184af59bddb19af45342c44af414353a9dda0d Mon Sep 17 00:00:00 2001 From: lijianan Date: Wed, 3 Jan 2024 14:06:23 +0800 Subject: [PATCH] throw exception when using emit changelog without _tp_delta column --- .../Streaming/ChangelogQueryVisitor.cpp | 11 ++++++++--- src/Storages/StorageView.cpp | 2 +- .../0013_changelog_stream11.yaml | 6 ++++-- .../0013_changelog_stream13.yaml | 2 +- .../0013_changelog_stream8.yaml | 2 +- .../test_stream_smoke/0029_emit_changelog.yaml | 18 +++++++++--------- 6 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/Interpreters/Streaming/ChangelogQueryVisitor.cpp b/src/Interpreters/Streaming/ChangelogQueryVisitor.cpp index 5c607e5537f..dd5cac5f4ba 100644 --- a/src/Interpreters/Streaming/ChangelogQueryVisitor.cpp +++ b/src/Interpreters/Streaming/ChangelogQueryVisitor.cpp @@ -128,9 +128,14 @@ void ChangelogQueryVisitorMatcher::addDeltaColumn(ASTSelectQuery & select_query, } } - /// Need add delta if _tp_delta is not present and the @p select_query is a subquery or an `EMIT CHANGELOG` query - if (!found_delta_col && (is_subquery || query_info.force_emit_changelog)) - select_expression_list->children.emplace_back(std::make_shared(ProtonConsts::RESERVED_DELTA_FLAG)); + if (!found_delta_col) + { + if (is_subquery) + /// Need add delta if _tp_delta is not present and the @p select_query is a subquery + select_expression_list->children.emplace_back(std::make_shared(ProtonConsts::RESERVED_DELTA_FLAG)); + else if (query_info.force_emit_changelog) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "The query with emit changelog explicitly requires a `_tp_delta` in select list"); + } if (add_new_required_result_columns) new_required_result_column_names.push_back(ProtonConsts::RESERVED_DELTA_FLAG); diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp index b8a2e5cf900..7df8185888d 100644 --- a/src/Storages/StorageView.cpp +++ b/src/Storages/StorageView.cpp @@ -139,7 +139,7 @@ void StorageView::read( Streaming::rewriteAsChangelogQuery(current_inner_query->as()); /// proton: ends. - auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, false, query_info.settings_limit_offset_done); + auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done); InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, options, column_names); interpreter.addStorageLimits(*query_info.storage_limits); interpreter.buildQueryPlan(query_plan); diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream11.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream11.yaml index d14aef02f19..03cb374b15c 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream11.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream11.yaml @@ -454,7 +454,8 @@ tests: create materialized view if not exists test14_view1_11 into test14_target_multishard_changelog_kv_11 as (select i, k1, - k2 + k2, + _tp_delta from (select i, k1, @@ -1257,7 +1258,8 @@ tests: create materialized view if not exists test14_view1_11 into test14_target_changelog_kv_multishard_11 as (select i, k1, - k2 + k2, + _tp_delta from (select i, k1, diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml index a9b9736e52b..3dcafee8a6e 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml @@ -40,7 +40,7 @@ tests: wait: 2 depends_on_stream: v_12181627 query_id: '12181627214' - query: subscribe to select sum_distinct(val), sum(val), count_distinct(val), count(val) + query: subscribe to select sum_distinct(val), sum(val), count_distinct(val), count(val), _tp_delta from v_12181627 emit changelog settings checkpoint_interval=1; - client: python query_type: table diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml index e363266f4a4..39d14b47556 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml @@ -282,7 +282,7 @@ tests: - client: python query_type: table wait: 2 - query: create materialized view if not exists test14_view2_8 into test14_target_changelog_kv_8 as (select i, k1, k2 from test14_view1_8 emit changelog); + query: create materialized view if not exists test14_view2_8 into test14_target_changelog_kv_8 as (select i, k1, k2, _tp_delta from test14_view1_8 emit changelog); - client: python query_id: '1450' diff --git a/tests/stream/test_stream_smoke/0029_emit_changelog.yaml b/tests/stream/test_stream_smoke/0029_emit_changelog.yaml index ac2302d8f7b..30648dc1f7e 100644 --- a/tests/stream/test_stream_smoke/0029_emit_changelog.yaml +++ b/tests/stream/test_stream_smoke/0029_emit_changelog.yaml @@ -45,7 +45,7 @@ tests: depends_on_stream: test30_stream wait: 1 query: | - select count() from test30_stream emit changelog periodic 1s; + select count(), _tp_delta from test30_stream emit changelog periodic 1s; - client: python query_type: table @@ -96,7 +96,7 @@ tests: depends_on_stream: test30_vk_stream wait: 1 query: | - select k, count() from test30_vk_stream group by k emit changelog periodic 1s; + select k, count(), _tp_delta from test30_vk_stream group by k emit changelog periodic 1s; - client: python query_type: table @@ -148,7 +148,7 @@ tests: depends_on_stream: test30_changelog_kv_stream wait: 1 query: | - select k, count() from test30_changelog_kv_stream group by k emit changelog periodic 1s; + select k, count(), _tp_delta from test30_changelog_kv_stream group by k emit changelog periodic 1s; - client: python query_type: table @@ -200,7 +200,7 @@ tests: depends_on_stream: test30_stream wait: 1 query: | - select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s; + select k, count(), _tp_delta from changelog(test30_stream, i) group by k emit changelog periodic 1s; - client: python query_type: table @@ -259,14 +259,14 @@ tests: query_id: 3001 depends_on_stream: test30_stream query: | - select count() from (select k, count() from changelog(test30_stream, i) group by k emit periodic 1s) emit changelog periodic 1s; + select count(), _tp_delta from (select k, count() from changelog(test30_stream, i) group by k emit periodic 1s) emit changelog periodic 1s; - client: python query_type: stream query_id: 3002 depends_on_stream: test30_stream query: | - select count() from (select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s) emit changelog periodic 1s; + select count(), _tp_delta from (select k, count() from changelog(test30_stream, i) group by k emit changelog periodic 1s) emit changelog periodic 1s; - client: python query_type: table @@ -339,13 +339,13 @@ tests: depends_on_stream: test30_right_vk_stream wait: 1 query: | - select i, count(), max(test30_right_vk_stream.k) from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s; + select i, count(), max(test30_right_vk_stream.k), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s; - client: python query_type: stream query_id: 3001 query: | - select test30_right_vk_stream.k, count(), max(i) from test30_left_stream join test30_right_vk_stream using(i) group by test30_right_vk_stream.k emit changelog periodic 1s; + select test30_right_vk_stream.k, count(), max(i), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by test30_right_vk_stream.k emit changelog periodic 1s; - client: python query_type: table @@ -428,7 +428,7 @@ tests: depends_on_stream: test30_right_vk_stream wait: 1 query: | - subscribe to select i, count(), max(test30_right_vk_stream.k) from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s settings checkpoint_interval=1; + subscribe to select i, count(), max(test30_right_vk_stream.k), _tp_delta from test30_left_stream join test30_right_vk_stream using(i) group by i emit changelog periodic 1s settings checkpoint_interval=1; - client: python query_type: table