Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen committed Jan 15, 2024
1 parent 4bd28d5 commit 8ce6bed
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, keep_windows, 0, "How many streaming windows to keep from recycling", 0) \
M(String, seek_to, "", "Seeking to an offset of the streaming/historical store to seek", 0) \
M(Bool, enable_backfill_from_historical_store, true, "Enable backfill data from historical data store", 0) \
M(Bool, emit_aggregated_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \
M(Bool, emit_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \
M(Bool, force_backfill_in_order, false, "Requires backfill data in order", 0) \
M(Bool, include_internal_streams, false, "Show internal streams on SHOW streams query.", 0) \
M(UInt64, join_max_buffered_bytes, 524288000, "Max buffered bytes for stream to stream join", 0) \
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2280,7 +2280,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);

/// proton: starts.
Streaming::rewriteSubqueryByQueryInfo(subquery->as<ASTSelectWithUnionQuery &>(), query_info);
Streaming::rewriteSubquery(subquery->as<ASTSelectWithUnionQuery &>(), query_info);
/// proton: ends.

interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
Expand Down Expand Up @@ -3442,7 +3442,7 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery()
/// Usually, we don't care whether the backfilled data is in order. Excepts:
/// 1) User require backfill data in order
/// 2) User need window aggr emit result during backfill (it expects that process data in ascending event time)
if (settings.emit_aggregated_during_backfill.value && hasAggregation() && hasStreamingWindowFunc())
if (settings.emit_during_backfill.value && hasAggregation() && hasStreamingWindowFunc())
context->setSetting("force_backfill_in_order", true);

if (settings.force_backfill_in_order.value)
Expand Down Expand Up @@ -3508,7 +3508,7 @@ void InterpreterSelectQuery::buildWatermarkQueryPlan(QueryPlan & query_plan) con
auto params = std::make_shared<Streaming::WatermarkStamperParams>(
query_info.query, query_info.syntax_analyzer_result, query_info.streaming_window_params);

bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_aggregated_during_backfill.value;
bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_during_backfill.value;

if (query_info.hasPartitionByKeys())
query_plan.addStep(std::make_unique<Streaming::WatermarkStepWithSubstream>(
Expand Down
7 changes: 3 additions & 4 deletions src/Interpreters/Streaming/RewriteAsSubquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,12 @@ bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only
return rewriteAsChangelogQuery(query);
}

bool rewriteSubqueryByQueryInfo(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info)
bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info)
{
bool rewriten = false;
if (query_info.left_input_tracking_changes)
rewriten |= rewriteAsChangelogQuery(query);
return rewriteAsChangelogQuery(query);

return rewriten;
return false;
}
}
}
8 changes: 4 additions & 4 deletions src/Interpreters/Streaming/RewriteAsSubquery.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ namespace Streaming
/// 1) `stream1` => `(select * from stream1) as stream1`
/// 2) `stream2 as t` => `(select * from stream2) as t`
/// 3) `table_func(...) as t1` => `(select * from table_func(...)) as t1`
/// Return rewritten subquery (return `nullptr` if is subquery)
/// \return rewritten subquery (return `nullptr` if is subquery)
ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression);

/// Rewrite `table/table_function` to subquery (emit changelog):
/// 1) `stream1` => `(select * from stream1 emit changelog) as stream1`
/// 2) `stream2 as t` => `(select * from stream2 emit changelog) as t`
/// 3) `table_func(...) as t1` => `(select * from table_func(...) emit changelog) as t1`
/// 4) `(select * from stream1) as s` => `(select * from stream1 emit changelog) as s`
/// Return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function)
/// \return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function)
bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only_rewrite_subquery);


bool rewriteSubqueryByQueryInfo(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info);
/// \return true if query was rewritten and false otherwise
bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ HopWatermarkStamper::HopWatermarkStamper(const WatermarkStamperParams & params_,
throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode));
}

Int64 HopWatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
Int64 HopWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
auto last_finalized_window = HopHelper::getLastFinalizedWindow(event_ts, window_params);
if (likely(last_finalized_window.isValid()))
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Transforms/Streaming/HopWatermarkStamper.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class HopWatermarkStamper final : public WatermarkStamper
WatermarkStamperPtr clone() const override { return std::make_unique<HopWatermarkStamper>(*this); }

private:
Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const override;
Int64 calculateWatermarkImpl(Int64 event_ts) const override;

HopWindowParams & window_params;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TumbleWatermarkStamper::TumbleWatermarkStamper(const WatermarkStamperParams & pa
throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode));
}

Int64 TumbleWatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
Int64 TumbleWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
return toStartTime(
event_ts, window_params.interval_kind, window_params.window_interval, *window_params.time_zone, window_params.time_scale);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TumbleWatermarkStamper final : public WatermarkStamper
WatermarkStamperPtr clone() const override { return std::make_unique<TumbleWatermarkStamper>(*this); }

private:
Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const override;
Int64 calculateWatermarkImpl(Int64 event_ts) const override;

TumbleWindowParams & window_params;
};
Expand Down
134 changes: 69 additions & 65 deletions src/Processors/Transforms/Streaming/WatermarkStamper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ void WatermarkStamper::preProcess(const Block & header)
initTimeoutTimer(params.timeout_interval);
}

template <bool apply_watermark_per_row>
ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const
{
if (params.delay_interval)
Expand All @@ -158,39 +157,47 @@ ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const
*params.window_params->time_zone,
params.window_params->time_scale);

if constexpr (apply_watermark_per_row)
return event_ts_bias;
else
return calculateWatermarkBasedOnWindowImpl(event_ts_bias);
return calculateWatermarkImpl(event_ts_bias);
}
else
{
if constexpr (apply_watermark_per_row)
return event_ts;
else
return calculateWatermarkBasedOnWindowImpl(event_ts);
}
return calculateWatermarkImpl(event_ts);
}

ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermarkPerRow(Int64 event_ts) const
{
if (params.delay_interval)
return addTime(
event_ts,
params.delay_interval.unit,
-1 * params.delay_interval.interval,
*params.window_params->time_zone,
params.window_params->time_scale);
else
return event_ts;
}

void WatermarkStamper::processAfterUnmuted(Chunk & chunk)
{
assert(!chunk.hasRows() && chunk.isHistoricalDataEnd());
assert(!chunk.hasRows());

switch (params.mode)
{
case WatermarkStamperParams::EmitMode::PERIODIC: {
case WatermarkStamperParams::EmitMode::PERIODIC:
{
processPeriodic(chunk);
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK: {
auto muted_watermark_ts = calculateWatermark<false>(max_event_ts);
if (muted_watermark_ts != INVALID_WATERMARK)
case WatermarkStamperParams::EmitMode::WATERMARK:
{
auto muted_watermark_ts = calculateWatermark(max_event_ts);
if (muted_watermark_ts != INVALID_WATERMARK) [[likely]]
chunk.setWatermark(muted_watermark_ts);
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: {
auto muted_watermark_ts = calculateWatermark<true>(max_event_ts);
if (muted_watermark_ts != INVALID_WATERMARK)
case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW:
{
auto muted_watermark_ts = calculateWatermarkPerRow(max_event_ts);
if (muted_watermark_ts != INVALID_WATERMARK) [[likely]]
chunk.setWatermark(muted_watermark_ts);
break;
}
Expand All @@ -199,53 +206,53 @@ void WatermarkStamper::processAfterUnmuted(Chunk & chunk)
}
}

template <bool mute_watermark>
void WatermarkStamper::processWithMutedWatermark(Chunk & chunk)
{
/// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed.
/// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed
if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW))
{
assert(params.window_params);
if (params.window_params->time_col_is_datetime64)
max_event_ts = std::max<Int64>(
max_event_ts,
*std::ranges::max_element(assert_cast<const ColumnDateTime64 &>(*chunk.getColumns()[time_col_pos]).getData()));
else
max_event_ts = std::max<Int64>(
max_event_ts,
*std::ranges::max_element(assert_cast<const ColumnDateTime &>(*chunk.getColumns()[time_col_pos]).getData()));
}

processTimeout(chunk);
logLateEvents();
}

void WatermarkStamper::process(Chunk & chunk)
{
if constexpr (mute_watermark)
switch (params.mode)
{
/// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed.
/// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed
if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW))
{
case WatermarkStamperParams::EmitMode::PERIODIC: {
processPeriodic(chunk);
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK: {
assert(params.window_params);
if (params.window_params->time_col_is_datetime64)
max_event_ts = std::max<Int64>(
max_event_ts,
*std::ranges::max_element(assert_cast<const ColumnDateTime64 &>(*chunk.getColumns()[time_col_pos]).getData()));
processWatermark<ColumnDateTime64, false>(chunk);
else
max_event_ts = std::max<Int64>(
max_event_ts,
*std::ranges::max_element(assert_cast<const ColumnDateTime &>(*chunk.getColumns()[time_col_pos]).getData()));
processWatermark<ColumnDateTime, false>(chunk);
break;
}
}
else
{
switch (params.mode)
{
case WatermarkStamperParams::EmitMode::PERIODIC: {
processPeriodic(chunk);
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK: {
assert(params.window_params);
if (params.window_params->time_col_is_datetime64)
processWatermark<ColumnDateTime64, false>(chunk);
else
processWatermark<ColumnDateTime, false>(chunk);
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: {
assert(params.window_params);
if (params.window_params->time_col_is_datetime64)
processWatermark<ColumnDateTime64, true>(chunk);
else
processWatermark<ColumnDateTime, true>(chunk);
break;
}
default:
break;
case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: {
assert(params.window_params);
if (params.window_params->time_col_is_datetime64)
processWatermark<ColumnDateTime64, true>(chunk);
else
processWatermark<ColumnDateTime, true>(chunk);
break;
}
default:
break;
}

processTimeout(chunk);
Expand Down Expand Up @@ -333,7 +340,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
max_event_ts = event_ts;

if constexpr (apply_watermark_per_row)
event_ts_watermark = calculateWatermark<apply_watermark_per_row>(max_event_ts);
event_ts_watermark = calculateWatermarkPerRow(max_event_ts);
}

if (unlikely(event_ts < event_ts_watermark))
Expand All @@ -344,7 +351,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
}

if constexpr (!apply_watermark_per_row)
event_ts_watermark = calculateWatermark<apply_watermark_per_row>(max_event_ts);
event_ts_watermark = calculateWatermark(max_event_ts);

if (late_events_in_chunk > 0)
{
Expand All @@ -364,9 +371,9 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
}
}

Int64 WatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
Int64 WatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkBasedOnWindowImpl() not implemented in {}", getName());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkImpl() not implemented in {}", getName());
}

void WatermarkStamper::initPeriodicTimer(const WindowInterval & interval)
Expand Down Expand Up @@ -434,8 +441,5 @@ void WatermarkStamper::deserialize(ReadBuffer & rb)
readIntBinary(last_logged_late_events, rb);
readIntBinary(last_logged_late_events_ts, rb);
}

template void WatermarkStamper::process<true>(Chunk &);
template void WatermarkStamper::process<false>(Chunk &);
}
}
8 changes: 5 additions & 3 deletions src/Processors/Transforms/Streaming/WatermarkStamper.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ SERDE class WatermarkStamper

void preProcess(const Block & header);

template <bool mute_watermark = false>
void process(Chunk & chunk);

/// During mute watermark, we still need to process the chunk to update max_event_ts
void processWithMutedWatermark(Chunk & chunk);

void processAfterUnmuted(Chunk & chunk);

bool requiresPeriodicOrTimeoutEmit() const { return periodic_interval || timeout_interval; }
Expand All @@ -84,10 +86,10 @@ SERDE class WatermarkStamper

void logLateEvents();

template <bool apply_watermark_per_row>
ALWAYS_INLINE Int64 calculateWatermark(Int64 event_ts) const;
ALWAYS_INLINE Int64 calculateWatermarkPerRow(Int64 event_ts) const;

virtual Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const;
virtual Int64 calculateWatermarkImpl(Int64 event_ts) const;

void initPeriodicTimer(const WindowInterval & interval);

Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Transforms/Streaming/WatermarkTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ void WatermarkTransform::transform(Chunk & chunk)
return;

if (mute_watermark)
watermark->process<true>(chunk);
watermark->processWithMutedWatermark(chunk);
else
watermark->process<false>(chunk);
watermark->process(chunk);
}

void WatermarkTransform::checkpoint(CheckpointContextPtr ckpt_ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void WatermarkTransformWithSubstream::work()
output_chunks.emplace_back(process_chunk.clone());
for (auto & [id, watermark] : substream_watermarks)
{
auto chunk_ctx =ChunkContext::create();
auto chunk_ctx = ChunkContext::create();
chunk_ctx->setSubstreamID(std::move(id));
process_chunk.setChunkContext(std::move(chunk_ctx)); /// reset context

Expand All @@ -154,9 +154,9 @@ void WatermarkTransformWithSubstream::work()
if (!process_chunk.avoidWatermark())
{
if (mute_watermark)
watermark.process<true>(process_chunk);
watermark.processWithMutedWatermark(process_chunk);
else
watermark.process<false>(process_chunk);
watermark.process(process_chunk);
}

assert(process_chunk);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void StorageView::read(
}

/// proton: starts.
Streaming::rewriteSubqueryByQueryInfo(current_inner_query->as<ASTSelectWithUnionQuery &>(), query_info);
Streaming::rewriteSubquery(current_inner_query->as<ASTSelectWithUnionQuery &>(), query_info);
/// proton: ends.

auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/Streaming/ProxyStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void ProxyStream::doRead(

if (current_subquery)
{
Streaming::rewriteSubqueryByQueryInfo(current_subquery->as<ASTSelectWithUnionQuery &>(), query_info);
Streaming::rewriteSubquery(current_subquery->as<ASTSelectWithUnionQuery &>(), query_info);

auto sub_context = createProxySubqueryContext(context_, query_info, isStreamingQuery());
auto interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
Expand Down
Loading

0 comments on commit 8ce6bed

Please sign in to comment.