-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support more emit modes in window #517
support more emit modes in window #517
Conversation
5776db7
to
a5e38d0
Compare
6caa777
to
11062b1
Compare
ce95d80
to
42cb940
Compare
src/Core/Streaming/Watermark.h
Outdated
/// TODO: more strategies ... | ||
}; | ||
|
||
enum class WatermarkEmitMode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this since the WatermarkStrategy already says this purpose ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shall be just EmitMode
instead of WatermarkEmitMode or better EmitStrategy
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WatermarkStrategy
tells us how / when to stamp watermark.
EmitStrategy
tells us how to do finalization (what to emit) when watermark has been observed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for EmitStrategy, we only have 4 combinations :
emit all states
in a tumble / hop window or global window when a watermark is observed.emit only changed states per group key
in a timed window or global window when a watermark is observed. This emit strategy works for session window and substream by default from day oneemit changelog
, combined with 1) and 2), we have 2 here. But emit changelog for all statesprobably doesn't make sense at all,
emit changelog for updates` make lots of sense though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the valid emit strategy for now could be
1. emit all states
2. emit only keyed and changed states
3. emit changelog for keyed and changed states
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Observing the timestamp, stamp watermark, assign window, emit states are all hand in hand things, we may need think about them carefully.
Previously we basically did everything (except emit states) in the stamper with session window as an exception.
I like the idea to separate the Observing the timestamp, stamp watermark
and assigning window
and emit states
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we separate watermark stamping and window assignment, watermark itself could be pretty simple and having these use cases
- Ascending - basically proc time processing (EMIT ON PROCTIME) or row number processing (row number as a timestamp, per row emit in future). For every event in a chunk, stamp a timestamp for each of them. This seems a stateless watermark strategy (probably we still have state incase wall clock rewind / adjusted on that machine ?)
- BoundedOutOfOrderness - (EMIT ... WITH DELAY), observing the timestamp column for each event, maintain the max timestamp observed so far, if the out of order for an event is within delay, still assign a watermark to it, update the max observed timestamp as we go.
- OutOfOrdernessInWindowAndBatch - do we need that ? Since for tumble, hopping window, we know the delay automatically ? it seems a special case of 2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For session window, do we need a watermark ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there are more and more actions. Separating these can better maintain them.
In fact, EmitStrategy
should only be used within AggregatingTransform.
@@ -273,6 +273,9 @@ struct SessionWindowParams : WindowParams | |||
bool start_with_inclusion; | |||
bool end_with_inclusion; | |||
|
|||
/// TODO: So far, always assign session window in aggr | |||
bool assign_window_pushdown = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// So far, only for session window, we evaluate the watermark and window for the events in Aggregate Transform
/// For other windows, we assigned the watermark in Watermark stamper step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we are kind mixing window assignment and watermark assignment
for tumble / hopping / global windows since this is probably more performant.
Right now, we are separating these 2 steps, which are good for maintainability moving forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For session window, fundamentally, it doesn't have a fixed watermark because it highly depends on inputs and the evaluation of start / end expression.
So, it seems window assignment for now all happens at Aggregate transform step ? The Watermark evaluation happens in WatermarkStamper step and for session window, we don't have watermark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushdown_window_assignment
@@ -22,7 +24,8 @@ class AggregatingStep : public ITransformingStep | |||
size_t merge_threads_, | |||
size_t temporary_data_merge_threads_, | |||
bool emit_version_, | |||
bool emit_changelog_); | |||
bool emit_changelog_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool emit_changelog_
is redundant moving forward since emit strategy shall cover that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do it in a separate PR
virtual void removeBucketsImpl(Int64 watermark, const SubstreamContextPtr & substream_ctx) = 0; | ||
virtual bool needReassignWindow() const = 0; | ||
|
||
std::optional<size_t> window_start_col_pos; | ||
std::optional<size_t> window_end_col_pos; | ||
|
||
bool only_emit_finalized_windows = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these bools are basically a derive from the emit strategy enums ? It appears making the whole code path complex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, there are different finalizing path in AggregatingTransform for different emit strategy
return DB::convertToChunk(params.aggregator.spliceAndConvertUpdatesToBlock(data, buckets)); | ||
} | ||
|
||
Chunk mergeAndSpliceAndConvertUpdatesToChunk( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't need Splice
in the name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mergeAndConvertUpdatesToChunk
is existed, it's used in global aggr.
mergeAndSpliceAndConvertUpdatesToChunk
is used in window aggr.
switch (params.type) | ||
{ | ||
case WindowType::TUMBLE: | ||
return TumbleHelper::validateWatermarkStrategyAndEmitMode(strategy, mode, assert_cast<TumbleWindowParams &>(params)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit to use free functions compared with inheritance as we did switch and all type casts etc here
|
||
void processTimeout(Chunk & chunk); | ||
/// \brief Emit a watermark according to the EmitMode | ||
/// \p old_watermark_ts - saved watermark before current processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's consistently use either @brief, \brief, \param etc
, \param
has builtin support by CLION
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do it in a separate PR
Int64 next_timeout_emit_ts = 0; | ||
Int64 timeout_interval = 0; | ||
/// For periodic or timeout timer | ||
InternalTimer periodic_timer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we leverage Stopwatch
we probably can do similar thing ?
General comments
For example, if we do For another example, if we do |
src/Core/Streaming/Watermark.h
Outdated
/// No watermark | ||
None, | ||
|
||
/// Watermark is generated based on each event, this is used for `EMIT AFTER WATERMARK WITHOUT DELAY` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For every event ? What's the use case ? Probably proc time processing ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For session window, we need events timestamp is in ascending, which is required by session start predicates/end predicates
src/Core/Streaming/Watermark.h
Outdated
|
||
/// (Built-in): Watermark is generated based on a batch events: | ||
/// - Allow time skew in one batch events | ||
OutOfOrdernessInBatch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For every batch, we generate a watermark ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused, remove it
42cb940
to
ddd1a73
Compare
|
||
[[maybe_unused]] ALWAYS_INLINE static bool onlyEmitUpdates(EmitMode mode) noexcept | ||
{ | ||
return static_cast<uint8_t>(mode) & EMIT_UPDATES_MASK; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's revisit this, it appears something wrong here since we are stuffing bit into an enum ?
PR checklist:
proton: starts/ends
for new code in existing community code base ?Please write user-readable short description of the changes:
Support emit intermediate windows (if specified
emit periodic
oremit on update
)Support only emit update groups (if specified
emit on update
oremit periodic on update
Tweak emit clause syntax:
(MODIFED)
[AFTER WATERMARK WITH DELAY <interval>]
: Allow time skew in a certain range `[<max_event_ts - delay_interval>, <max_event_ts>](NEW)
[ON UPDATE]
: emit per update and only emit update pergroup by key
(NEW)
[PERIODIC <interval> ON UPDATE]
: periodic emit and only emit update pergroup by key