diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 55c79af6230..2307b4c21f4 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -790,7 +790,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, keep_windows, 0, "How many streaming windows to keep from recycling", 0) \ M(String, seek_to, "", "Seeking to an offset of the streaming/historical store to seek", 0) \ M(Bool, enable_backfill_from_historical_store, true, "Enable backfill data from historical data store", 0) \ - M(Bool, emit_aggregated_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \ + M(Bool, emit_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \ M(Bool, force_backfill_in_order, false, "Requires backfill data in order", 0) \ M(Bool, include_internal_streams, false, "Show internal streams on SHOW streams query.", 0) \ M(UInt64, join_max_buffered_bytes, 524288000, "Max buffered bytes for stream to stream join", 0) \ diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 03d273b8dc5..49558771789 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2280,7 +2280,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR); /// proton: starts. - Streaming::rewriteSubqueryByQueryInfo(subquery->as(), query_info); + Streaming::rewriteSubquery(subquery->as(), query_info); /// proton: ends. interpreter_subquery = std::make_unique( @@ -3442,7 +3442,7 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery() /// Usually, we don't care whether the backfilled data is in order. Excepts: /// 1) User require backfill data in order /// 2) User need window aggr emit result during backfill (it expects that process data in ascending event time) - if (settings.emit_aggregated_during_backfill.value && hasAggregation() && hasStreamingWindowFunc()) + if (settings.emit_during_backfill.value && hasAggregation() && hasStreamingWindowFunc()) context->setSetting("force_backfill_in_order", true); if (settings.force_backfill_in_order.value) @@ -3508,7 +3508,7 @@ void InterpreterSelectQuery::buildWatermarkQueryPlan(QueryPlan & query_plan) con auto params = std::make_shared( query_info.query, query_info.syntax_analyzer_result, query_info.streaming_window_params); - bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_aggregated_during_backfill.value; + bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_during_backfill.value; if (query_info.hasPartitionByKeys()) query_plan.addStep(std::make_unique( diff --git a/src/Interpreters/Streaming/RewriteAsSubquery.cpp b/src/Interpreters/Streaming/RewriteAsSubquery.cpp index 74bb76428eb..416b5d5ae11 100644 --- a/src/Interpreters/Streaming/RewriteAsSubquery.cpp +++ b/src/Interpreters/Streaming/RewriteAsSubquery.cpp @@ -137,13 +137,12 @@ bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only return rewriteAsChangelogQuery(query); } -bool rewriteSubqueryByQueryInfo(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info) +bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info) { - bool rewriten = false; if (query_info.left_input_tracking_changes) - rewriten |= rewriteAsChangelogQuery(query); + return rewriteAsChangelogQuery(query); - return rewriten; + return false; } } } diff --git a/src/Interpreters/Streaming/RewriteAsSubquery.h b/src/Interpreters/Streaming/RewriteAsSubquery.h index 53287bcf64e..8a458755f2f 100644 --- a/src/Interpreters/Streaming/RewriteAsSubquery.h +++ b/src/Interpreters/Streaming/RewriteAsSubquery.h @@ -14,7 +14,7 @@ namespace Streaming /// 1) `stream1` => `(select * from stream1) as stream1` /// 2) `stream2 as t` => `(select * from stream2) as t` /// 3) `table_func(...) as t1` => `(select * from table_func(...)) as t1` -/// Return rewritten subquery (return `nullptr` if is subquery) +/// \return rewritten subquery (return `nullptr` if is subquery) ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression); /// Rewrite `table/table_function` to subquery (emit changelog): @@ -22,10 +22,10 @@ ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression); /// 2) `stream2 as t` => `(select * from stream2 emit changelog) as t` /// 3) `table_func(...) as t1` => `(select * from table_func(...) emit changelog) as t1` /// 4) `(select * from stream1) as s` => `(select * from stream1 emit changelog) as s` -/// Return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function) +/// \return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function) bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only_rewrite_subquery); - -bool rewriteSubqueryByQueryInfo(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info); +/// \return true if query was rewritten and false otherwise +bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info); } } diff --git a/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp b/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp index 78d118c810a..def7080683c 100644 --- a/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp +++ b/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp @@ -18,7 +18,7 @@ HopWatermarkStamper::HopWatermarkStamper(const WatermarkStamperParams & params_, throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode)); } -Int64 HopWatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const +Int64 HopWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const { auto last_finalized_window = HopHelper::getLastFinalizedWindow(event_ts, window_params); if (likely(last_finalized_window.isValid())) diff --git a/src/Processors/Transforms/Streaming/HopWatermarkStamper.h b/src/Processors/Transforms/Streaming/HopWatermarkStamper.h index c73071757b1..2321b0a1415 100644 --- a/src/Processors/Transforms/Streaming/HopWatermarkStamper.h +++ b/src/Processors/Transforms/Streaming/HopWatermarkStamper.h @@ -17,7 +17,7 @@ class HopWatermarkStamper final : public WatermarkStamper WatermarkStamperPtr clone() const override { return std::make_unique(*this); } private: - Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const override; + Int64 calculateWatermarkImpl(Int64 event_ts) const override; HopWindowParams & window_params; }; diff --git a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp index 97ef6a4d84f..9e5b068a810 100644 --- a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp +++ b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp @@ -16,7 +16,7 @@ TumbleWatermarkStamper::TumbleWatermarkStamper(const WatermarkStamperParams & pa throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode)); } -Int64 TumbleWatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const +Int64 TumbleWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const { return toStartTime( event_ts, window_params.interval_kind, window_params.window_interval, *window_params.time_zone, window_params.time_scale); diff --git a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h index df8e639eab2..2a7127bdd27 100644 --- a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h +++ b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h @@ -18,7 +18,7 @@ class TumbleWatermarkStamper final : public WatermarkStamper WatermarkStamperPtr clone() const override { return std::make_unique(*this); } private: - Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const override; + Int64 calculateWatermarkImpl(Int64 event_ts) const override; TumbleWindowParams & window_params; }; diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp index 34619b551ee..d961fd9d3fc 100644 --- a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp +++ b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp @@ -146,7 +146,6 @@ void WatermarkStamper::preProcess(const Block & header) initTimeoutTimer(params.timeout_interval); } -template ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const { if (params.delay_interval) @@ -158,39 +157,47 @@ ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const *params.window_params->time_zone, params.window_params->time_scale); - if constexpr (apply_watermark_per_row) - return event_ts_bias; - else - return calculateWatermarkBasedOnWindowImpl(event_ts_bias); + return calculateWatermarkImpl(event_ts_bias); } else - { - if constexpr (apply_watermark_per_row) - return event_ts; - else - return calculateWatermarkBasedOnWindowImpl(event_ts); - } + return calculateWatermarkImpl(event_ts); +} + +ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermarkPerRow(Int64 event_ts) const +{ + if (params.delay_interval) + return addTime( + event_ts, + params.delay_interval.unit, + -1 * params.delay_interval.interval, + *params.window_params->time_zone, + params.window_params->time_scale); + else + return event_ts; } void WatermarkStamper::processAfterUnmuted(Chunk & chunk) { - assert(!chunk.hasRows() && chunk.isHistoricalDataEnd()); + assert(!chunk.hasRows()); switch (params.mode) { - case WatermarkStamperParams::EmitMode::PERIODIC: { + case WatermarkStamperParams::EmitMode::PERIODIC: + { processPeriodic(chunk); break; } - case WatermarkStamperParams::EmitMode::WATERMARK: { - auto muted_watermark_ts = calculateWatermark(max_event_ts); - if (muted_watermark_ts != INVALID_WATERMARK) + case WatermarkStamperParams::EmitMode::WATERMARK: + { + auto muted_watermark_ts = calculateWatermark(max_event_ts); + if (muted_watermark_ts != INVALID_WATERMARK) [[likely]] chunk.setWatermark(muted_watermark_ts); break; } - case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: { - auto muted_watermark_ts = calculateWatermark(max_event_ts); - if (muted_watermark_ts != INVALID_WATERMARK) + case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: + { + auto muted_watermark_ts = calculateWatermarkPerRow(max_event_ts); + if (muted_watermark_ts != INVALID_WATERMARK) [[likely]] chunk.setWatermark(muted_watermark_ts); break; } @@ -199,53 +206,53 @@ void WatermarkStamper::processAfterUnmuted(Chunk & chunk) } } -template +void WatermarkStamper::processWithMutedWatermark(Chunk & chunk) +{ + /// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed. + /// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed + if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW)) + { + assert(params.window_params); + if (params.window_params->time_col_is_datetime64) + max_event_ts = std::max( + max_event_ts, + *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData())); + else + max_event_ts = std::max( + max_event_ts, + *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData())); + } + + processTimeout(chunk); + logLateEvents(); +} + void WatermarkStamper::process(Chunk & chunk) { - if constexpr (mute_watermark) + switch (params.mode) { - /// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed. - /// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed - if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW)) - { + case WatermarkStamperParams::EmitMode::PERIODIC: { + processPeriodic(chunk); + break; + } + case WatermarkStamperParams::EmitMode::WATERMARK: { assert(params.window_params); if (params.window_params->time_col_is_datetime64) - max_event_ts = std::max( - max_event_ts, - *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData())); + processWatermark(chunk); else - max_event_ts = std::max( - max_event_ts, - *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData())); + processWatermark(chunk); + break; } - } - else - { - switch (params.mode) - { - case WatermarkStamperParams::EmitMode::PERIODIC: { - processPeriodic(chunk); - break; - } - case WatermarkStamperParams::EmitMode::WATERMARK: { - assert(params.window_params); - if (params.window_params->time_col_is_datetime64) - processWatermark(chunk); - else - processWatermark(chunk); - break; - } - case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: { - assert(params.window_params); - if (params.window_params->time_col_is_datetime64) - processWatermark(chunk); - else - processWatermark(chunk); - break; - } - default: - break; + case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: { + assert(params.window_params); + if (params.window_params->time_col_is_datetime64) + processWatermark(chunk); + else + processWatermark(chunk); + break; } + default: + break; } processTimeout(chunk); @@ -333,7 +340,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk) max_event_ts = event_ts; if constexpr (apply_watermark_per_row) - event_ts_watermark = calculateWatermark(max_event_ts); + event_ts_watermark = calculateWatermarkPerRow(max_event_ts); } if (unlikely(event_ts < event_ts_watermark)) @@ -344,7 +351,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk) } if constexpr (!apply_watermark_per_row) - event_ts_watermark = calculateWatermark(max_event_ts); + event_ts_watermark = calculateWatermark(max_event_ts); if (late_events_in_chunk > 0) { @@ -364,9 +371,9 @@ void WatermarkStamper::processWatermark(Chunk & chunk) } } -Int64 WatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const +Int64 WatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkBasedOnWindowImpl() not implemented in {}", getName()); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkImpl() not implemented in {}", getName()); } void WatermarkStamper::initPeriodicTimer(const WindowInterval & interval) @@ -434,8 +441,5 @@ void WatermarkStamper::deserialize(ReadBuffer & rb) readIntBinary(last_logged_late_events, rb); readIntBinary(last_logged_late_events_ts, rb); } - -template void WatermarkStamper::process(Chunk &); -template void WatermarkStamper::process(Chunk &); } } diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.h b/src/Processors/Transforms/Streaming/WatermarkStamper.h index 441d9f67d62..5eb3fc7b439 100644 --- a/src/Processors/Transforms/Streaming/WatermarkStamper.h +++ b/src/Processors/Transforms/Streaming/WatermarkStamper.h @@ -59,9 +59,11 @@ SERDE class WatermarkStamper void preProcess(const Block & header); - template void process(Chunk & chunk); + /// During mute watermark, we still need to process the chunk to update max_event_ts + void processWithMutedWatermark(Chunk & chunk); + void processAfterUnmuted(Chunk & chunk); bool requiresPeriodicOrTimeoutEmit() const { return periodic_interval || timeout_interval; } @@ -84,10 +86,10 @@ SERDE class WatermarkStamper void logLateEvents(); - template ALWAYS_INLINE Int64 calculateWatermark(Int64 event_ts) const; + ALWAYS_INLINE Int64 calculateWatermarkPerRow(Int64 event_ts) const; - virtual Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const; + virtual Int64 calculateWatermarkImpl(Int64 event_ts) const; void initPeriodicTimer(const WindowInterval & interval); diff --git a/src/Processors/Transforms/Streaming/WatermarkTransform.cpp b/src/Processors/Transforms/Streaming/WatermarkTransform.cpp index e561408db14..fd07f066740 100644 --- a/src/Processors/Transforms/Streaming/WatermarkTransform.cpp +++ b/src/Processors/Transforms/Streaming/WatermarkTransform.cpp @@ -71,9 +71,9 @@ void WatermarkTransform::transform(Chunk & chunk) return; if (mute_watermark) - watermark->process(chunk); + watermark->processWithMutedWatermark(chunk); else - watermark->process(chunk); + watermark->process(chunk); } void WatermarkTransform::checkpoint(CheckpointContextPtr ckpt_ctx) diff --git a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp index 557c8a6754f..370059514c7 100644 --- a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp +++ b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp @@ -130,7 +130,7 @@ void WatermarkTransformWithSubstream::work() output_chunks.emplace_back(process_chunk.clone()); for (auto & [id, watermark] : substream_watermarks) { - auto chunk_ctx =ChunkContext::create(); + auto chunk_ctx = ChunkContext::create(); chunk_ctx->setSubstreamID(std::move(id)); process_chunk.setChunkContext(std::move(chunk_ctx)); /// reset context @@ -154,9 +154,9 @@ void WatermarkTransformWithSubstream::work() if (!process_chunk.avoidWatermark()) { if (mute_watermark) - watermark.process(process_chunk); + watermark.processWithMutedWatermark(process_chunk); else - watermark.process(process_chunk); + watermark.process(process_chunk); } assert(process_chunk); diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp index b460cb61a2b..fdde026173c 100644 --- a/src/Storages/StorageView.cpp +++ b/src/Storages/StorageView.cpp @@ -135,7 +135,7 @@ void StorageView::read( } /// proton: starts. - Streaming::rewriteSubqueryByQueryInfo(current_inner_query->as(), query_info); + Streaming::rewriteSubquery(current_inner_query->as(), query_info); /// proton: ends. auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done); diff --git a/src/Storages/Streaming/ProxyStream.cpp b/src/Storages/Streaming/ProxyStream.cpp index ac43d4daaf5..849f0bfc18b 100644 --- a/src/Storages/Streaming/ProxyStream.cpp +++ b/src/Storages/Streaming/ProxyStream.cpp @@ -193,7 +193,7 @@ void ProxyStream::doRead( if (current_subquery) { - Streaming::rewriteSubqueryByQueryInfo(current_subquery->as(), query_info); + Streaming::rewriteSubquery(current_subquery->as(), query_info); auto sub_context = createProxySubqueryContext(context_, query_info, isStreamingQuery()); auto interpreter_subquery = std::make_unique( diff --git a/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml b/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml index 5d315f12fd4..09a1c4d9240 100644 --- a/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml +++ b/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml @@ -131,7 +131,7 @@ tests: query_id: 3100 depends_on_stream: test_31_multishards_stream query: | - subscribe to with cte as (select i as key, count() from test_31_multishards_stream where _tp_time > earliest_ts() group by key settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_aggregated_during_backfill=false; + subscribe to with cte as (select i as key, count() from test_31_multishards_stream where _tp_time > earliest_ts() group by key settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_during_backfill=false; - client: python query_type: table @@ -207,7 +207,7 @@ tests: depends_on_stream: test_31_multishards_stream wait: 1 query: | - subscribe to with cte as (select i as key, count() from changelog(test_31_multishards_stream, i) where _tp_time > earliest_ts() group by key emit changelog settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_aggregated_during_backfill=false; + subscribe to with cte as (select i as key, count() from changelog(test_31_multishards_stream, i) where _tp_time > earliest_ts() group by key emit changelog settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_during_backfill=false; - client: python query_type: table @@ -279,7 +279,7 @@ tests: wait: 1 query_end_timer: 5 query: | - with cte as (select i, count() from test_31_multishards_stream where _tp_time > earliest_ts() shuffle by i group by i settings group_by_two_level_threshold=10) select count() from cte settings emit_aggregated_during_backfill=false; + with cte as (select i, count() from test_31_multishards_stream where _tp_time > earliest_ts() shuffle by i group by i settings group_by_two_level_threshold=10) select count() from cte settings emit_during_backfill=false; - client: python query_type: table