Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix smoke test for disable backfill sorting #477

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/proton_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ jobs:
with:
ec2-instance-type: ${{ vars.X64_INSTANCE_TYPE }}
ec2-image-id: ${{ vars.X64_TEST_AMI }}
ec2-volume-size: '60'
ec2-volume-size: '80'
submodules: false
sanitizer: "address"
arch: ${{ vars.X64_ARCH }}
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, keep_windows, 0, "How many streaming windows to keep from recycling", 0) \
M(String, seek_to, "", "Seeking to an offset of the streaming/historical store to seek", 0) \
M(Bool, enable_backfill_from_historical_store, true, "Enable backfill data from historical data store", 0) \
M(Bool, emit_aggregated_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \
M(Bool, emit_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \
M(Bool, force_backfill_in_order, false, "Requires backfill data in order", 0) \
M(Bool, include_internal_streams, false, "Show internal streams on SHOW streams query.", 0) \
M(UInt64, join_max_buffered_bytes, 524288000, "Max buffered bytes for stream to stream join", 0) \
Expand Down
12 changes: 10 additions & 2 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
#include <Interpreters/Streaming/EventPredicateVisitor.h>
#include <Interpreters/Streaming/IHashJoin.h>
#include <Interpreters/Streaming/PartitionByVisitor.h>
#include <Interpreters/Streaming/RewriteAsSubquery.h>
#include <Interpreters/Streaming/SubstituteStreamingFunction.h>
#include <Interpreters/Streaming/SyntaxAnalyzeUtils.h>
#include <Interpreters/Streaming/TableFunctionDescription.h>
Expand Down Expand Up @@ -2278,6 +2279,10 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (!subquery)
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);

/// proton: starts.
Streaming::rewriteSubquery(subquery->as<ASTSelectWithUnionQuery &>(), query_info);
/// proton: ends.

interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
subquery, getSubqueryContext(context),
options.copy().subquery().noModify(), required_columns);
Expand Down Expand Up @@ -3437,7 +3442,10 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery()
/// Usually, we don't care whether the backfilled data is in order. Excepts:
/// 1) User require backfill data in order
/// 2) User need window aggr emit result during backfill (it expects that process data in ascending event time)
if (settings.force_backfill_in_order.value || (settings.emit_aggregated_during_backfill.value && hasAggregation() && hasStreamingWindowFunc()))
if (settings.emit_during_backfill.value && hasAggregation() && hasStreamingWindowFunc())
context->setSetting("force_backfill_in_order", true);

if (settings.force_backfill_in_order.value)
query_info.require_in_order_backfill = true;
}
else
Expand Down Expand Up @@ -3500,7 +3508,7 @@ void InterpreterSelectQuery::buildWatermarkQueryPlan(QueryPlan & query_plan) con
auto params = std::make_shared<Streaming::WatermarkStamperParams>(
query_info.query, query_info.syntax_analyzer_result, query_info.streaming_window_params);

bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_aggregated_during_backfill.value;
bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_during_backfill.value;

if (query_info.hasPartitionByKeys())
query_plan.addStep(std::make_unique<Streaming::WatermarkStepWithSubstream>(
Expand Down
44 changes: 28 additions & 16 deletions src/Interpreters/Streaming/RewriteAsSubquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/Streaming/ASTEmitQuery.h>
#include <Storages/SelectQueryInfo.h>

namespace DB
{
Expand All @@ -19,6 +20,29 @@ extern const int ALIAS_REQUIRED;

namespace Streaming
{
namespace
{
bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query)
{
if (query.list_of_selects->children.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only expect one select query to rewrite as changelog query");

auto & select_query = query.list_of_selects->children[0]->as<ASTSelectQuery &>();

/// Emit changelog
auto emit_query = select_query.emit();
if (!emit_query)
emit_query = std::make_shared<ASTEmitQuery>();

if (emit_query->as<ASTEmitQuery &>().stream_mode == ASTEmitQuery::StreamMode::CHANGELOG)
return false;

emit_query->as<ASTEmitQuery &>().stream_mode = ASTEmitQuery::StreamMode::CHANGELOG;
select_query.setExpression(ASTSelectQuery::Expression::EMIT, std::move(emit_query));
return true;
}
}

ASTPtr rewriteAsSubquery(ASTTableExpression & table_expr)
{
if (table_expr.subquery)
Expand Down Expand Up @@ -113,24 +137,12 @@ bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only
return rewriteAsChangelogQuery(query);
}

bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query)
bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info)
{
if (query.list_of_selects->children.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only expect one select query to rewrite as changelog query");

auto & select_query = query.list_of_selects->children[0]->as<ASTSelectQuery &>();
if (query_info.left_input_tracking_changes)
return rewriteAsChangelogQuery(query);

/// Emit changelog
auto emit_query = select_query.emit();
if (!emit_query)
emit_query = std::make_shared<ASTEmitQuery>();

if (emit_query->as<ASTEmitQuery &>().stream_mode == ASTEmitQuery::StreamMode::CHANGELOG)
return false;

emit_query->as<ASTEmitQuery &>().stream_mode = ASTEmitQuery::StreamMode::CHANGELOG;
select_query.setExpression(ASTSelectQuery::Expression::EMIT, std::move(emit_query));
return true;
return false;
}
}
}
8 changes: 5 additions & 3 deletions src/Interpreters/Streaming/RewriteAsSubquery.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@ namespace DB
{
struct ASTTableExpression;
class ASTSelectWithUnionQuery;
struct SelectQueryInfo;

namespace Streaming
{
/// Rewrite `table/table_function` to subquery:
/// 1) `stream1` => `(select * from stream1) as stream1`
/// 2) `stream2 as t` => `(select * from stream2) as t`
/// 3) `table_func(...) as t1` => `(select * from table_func(...)) as t1`
/// Return rewritten subquery (return `nullptr` if is subquery)
/// \return rewritten subquery (return `nullptr` if is subquery)
ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression);

/// Rewrite `table/table_function` to subquery (emit changelog):
/// 1) `stream1` => `(select * from stream1 emit changelog) as stream1`
/// 2) `stream2 as t` => `(select * from stream2 emit changelog) as t`
/// 3) `table_func(...) as t1` => `(select * from table_func(...) emit changelog) as t1`
/// 4) `(select * from stream1) as s` => `(select * from stream1 emit changelog) as s`
/// Return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function)
/// \return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function)
bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only_rewrite_subquery);

bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query);
/// \return true if query was rewritten and false otherwise
bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info);
}
}
2 changes: 1 addition & 1 deletion src/Processors/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct ChunkContext : public COW<ChunkContext>
if (hasWatermark())
{
flags &= ~WATERMARK_FLAG;
ts_1 = 0;
ts_1 = Streaming::INVALID_WATERMARK;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ HopWatermarkStamper::HopWatermarkStamper(const WatermarkStamperParams & params_,
throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode));
}

Int64 HopWatermarkStamper::calculateWatermark(Int64 event_ts) const
Int64 HopWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
auto last_finalized_window = HopHelper::getLastFinalizedWindow(event_ts, window_params);
if (likely(last_finalized_window.isValid()))
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Transforms/Streaming/HopWatermarkStamper.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class HopWatermarkStamper final : public WatermarkStamper
WatermarkStamperPtr clone() const override { return std::make_unique<HopWatermarkStamper>(*this); }

private:
Int64 calculateWatermark(Int64 event_ts) const override;
Int64 calculateWatermarkImpl(Int64 event_ts) const override;

HopWindowParams & window_params;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TumbleWatermarkStamper::TumbleWatermarkStamper(const WatermarkStamperParams & pa
throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode));
}

Int64 TumbleWatermarkStamper::calculateWatermark(Int64 event_ts) const
Int64 TumbleWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
return toStartTime(
event_ts, window_params.interval_kind, window_params.window_interval, *window_params.time_zone, window_params.time_scale);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TumbleWatermarkStamper final : public WatermarkStamper
WatermarkStamperPtr clone() const override { return std::make_unique<TumbleWatermarkStamper>(*this); }

private:
Int64 calculateWatermark(Int64 event_ts) const override;
Int64 calculateWatermarkImpl(Int64 event_ts) const override;

TumbleWindowParams & window_params;
};
Expand Down
114 changes: 85 additions & 29 deletions src/Processors/Transforms/Streaming/WatermarkStamper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,87 @@ void WatermarkStamper::preProcess(const Block & header)
initTimeoutTimer(params.timeout_interval);
}

ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const
{
if (params.delay_interval)
{
auto event_ts_bias = addTime(
event_ts,
params.delay_interval.unit,
-1 * params.delay_interval.interval,
*params.window_params->time_zone,
params.window_params->time_scale);

return calculateWatermarkImpl(event_ts_bias);
}
else
return calculateWatermarkImpl(event_ts);
}

ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermarkPerRow(Int64 event_ts) const
{
if (params.delay_interval)
return addTime(
event_ts,
params.delay_interval.unit,
-1 * params.delay_interval.interval,
*params.window_params->time_zone,
params.window_params->time_scale);
else
return event_ts;
}

void WatermarkStamper::processAfterUnmuted(Chunk & chunk)
{
assert(!chunk.hasRows());

switch (params.mode)
{
case WatermarkStamperParams::EmitMode::PERIODIC:
{
processPeriodic(chunk);
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK:
{
auto muted_watermark_ts = calculateWatermark(max_event_ts);
if (muted_watermark_ts != INVALID_WATERMARK) [[likely]]
chunk.setWatermark(muted_watermark_ts);
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW:
{
auto muted_watermark_ts = calculateWatermarkPerRow(max_event_ts);
if (muted_watermark_ts != INVALID_WATERMARK) [[likely]]
chunk.setWatermark(muted_watermark_ts);
break;
}
default:
break;
}
}

void WatermarkStamper::processWithMutedWatermark(Chunk & chunk)
{
/// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed.
/// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed
if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW))
{
assert(params.window_params);
if (params.window_params->time_col_is_datetime64)
max_event_ts = std::max<Int64>(
max_event_ts,
*std::ranges::max_element(assert_cast<const ColumnDateTime64 &>(*chunk.getColumns()[time_col_pos]).getData()));
else
max_event_ts = std::max<Int64>(
max_event_ts,
*std::ranges::max_element(assert_cast<const ColumnDateTime &>(*chunk.getColumns()[time_col_pos]).getData()));
}

processTimeout(chunk);
logLateEvents();
}

void WatermarkStamper::process(Chunk & chunk)
yl-lisen marked this conversation as resolved.
Show resolved Hide resolved
{
switch (params.mode)
Expand Down Expand Up @@ -238,31 +319,6 @@ void WatermarkStamper::processWatermark(Chunk & chunk)

assert(params.window_params);

std::function<Int64(Int64)> calc_watermark_ts;
if (params.delay_interval)
{
calc_watermark_ts = [this](Int64 event_ts) {
auto event_ts_bias = addTime(
event_ts,
params.delay_interval.unit,
-1 * params.delay_interval.interval,
*params.window_params->time_zone,
params.window_params->time_scale);

if constexpr (apply_watermark_per_row)
return event_ts_bias;
else
return calculateWatermark(event_ts_bias);
};
}
else
{
if constexpr (apply_watermark_per_row)
calc_watermark_ts = [](Int64 event_ts) { return event_ts; };
else
calc_watermark_ts = [this](Int64 event_ts) { return calculateWatermark(event_ts); };
}

Int64 event_ts_watermark = watermark_ts;

/// [Process chunks]
Expand All @@ -284,7 +340,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
max_event_ts = event_ts;

if constexpr (apply_watermark_per_row)
event_ts_watermark = calc_watermark_ts(max_event_ts);
event_ts_watermark = calculateWatermarkPerRow(max_event_ts);
}

if (unlikely(event_ts < event_ts_watermark))
Expand All @@ -295,7 +351,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
}

if constexpr (!apply_watermark_per_row)
event_ts_watermark = calc_watermark_ts(max_event_ts);
event_ts_watermark = calculateWatermark(max_event_ts);

if (late_events_in_chunk > 0)
{
Expand All @@ -315,9 +371,9 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
}
}

Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const
Int64 WatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermark() not implemented in {}", getName());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkImpl() not implemented in {}", getName());
}

void WatermarkStamper::initPeriodicTimer(const WindowInterval & interval)
Expand Down
11 changes: 10 additions & 1 deletion src/Processors/Transforms/Streaming/WatermarkStamper.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ SERDE class WatermarkStamper
virtual String getName() const { return "WatermarkStamper"; }

void preProcess(const Block & header);

void process(Chunk & chunk);

/// During mute watermark, we still need to process the chunk to update max_event_ts
void processWithMutedWatermark(Chunk & chunk);

void processAfterUnmuted(Chunk & chunk);

bool requiresPeriodicOrTimeoutEmit() const { return periodic_interval || timeout_interval; }

VersionType getVersion() const;
Expand All @@ -80,7 +86,10 @@ SERDE class WatermarkStamper

void logLateEvents();

virtual Int64 calculateWatermark(Int64 event_ts) const;
ALWAYS_INLINE Int64 calculateWatermark(Int64 event_ts) const;
ALWAYS_INLINE Int64 calculateWatermarkPerRow(Int64 event_ts) const;

virtual Int64 calculateWatermarkImpl(Int64 event_ts) const;

void initPeriodicTimer(const WindowInterval & interval);

Expand Down
Loading
Loading