Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support more emit modes in window #517

Merged

Conversation

yl-lisen
Copy link
Collaborator

@yl-lisen yl-lisen commented Jan 23, 2024

PR checklist:

  • Did you run ClangFormat ?
  • Did you separate headers to a different section in existing community code base ?
  • Did you surround proton: starts/ends for new code in existing community code base ?

Please write user-readable short description of the changes:

  • Support emit intermediate windows (if specified emit periodic or emit on update)

  • Support only emit update groups (if specified emit on update or emit periodic on update

  • Tweak emit clause syntax:

EMIT [STREAM|CHANGELOG]
    [AFTER WATERMARK WITH DELAY <interval>]
    [PERIODIC <interval>]
    [ON UPDATE]
    [[ AND ]TIMEOUT <interval>]
    [[ AND ]LAST <last-x> [ON PROCTIME]]

(MODIFED)
[AFTER WATERMARK WITH DELAY <interval>] : Allow time skew in a certain range `[<max_event_ts - delay_interval>, <max_event_ts>]

(NEW) [ON UPDATE] : emit per update and only emit update per group by key
(NEW) [PERIODIC <interval> ON UPDATE] : periodic emit and only emit update per group by key

@yl-lisen yl-lisen self-assigned this Jan 23, 2024
@yl-lisen yl-lisen force-pushed the feature/issue-440-support-more-emit-modes-in-window branch from 5776db7 to a5e38d0 Compare January 23, 2024 08:17
@yl-lisen yl-lisen marked this pull request as ready for review January 23, 2024 09:45
@yl-lisen yl-lisen requested a review from chenziliang January 23, 2024 16:06
@yl-lisen yl-lisen force-pushed the feature/issue-440-support-more-emit-modes-in-window branch from 6caa777 to 11062b1 Compare January 24, 2024 09:24
@yl-lisen yl-lisen force-pushed the feature/issue-440-support-more-emit-modes-in-window branch from ce95d80 to 42cb940 Compare February 5, 2024 22:57
/// TODO: more strategies ...
};

enum class WatermarkEmitMode
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this since the WatermarkStrategy already says this purpose ?

Copy link
Collaborator

@chenziliang chenziliang Feb 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shall be just EmitMode instead of WatermarkEmitMode or better EmitStrategy ?

Copy link
Collaborator

@chenziliang chenziliang Feb 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WatermarkStrategy tells us how / when to stamp watermark.
EmitStrategy tells us how to do finalization (what to emit) when watermark has been observed.

Copy link
Collaborator

@chenziliang chenziliang Feb 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for EmitStrategy, we only have 4 combinations :

  1. emit all states in a tumble / hop window or global window when a watermark is observed.
  2. emit only changed states per group key in a timed window or global window when a watermark is observed. This emit strategy works for session window and substream by default from day one
  3. emit changelog, combined with 1) and 2), we have 2 here. But emit changelog for all statesprobably doesn't make sense at all,emit changelog for updates` make lots of sense though

Copy link
Collaborator

@chenziliang chenziliang Feb 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the valid emit strategy for now could be

1. emit all states
2. emit only keyed and changed states
3. emit changelog for keyed and changed states

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observing the timestamp, stamp watermark, assign window, emit states are all hand in hand things, we may need think about them carefully.

Previously we basically did everything (except emit states) in the stamper with session window as an exception.

I like the idea to separate the Observing the timestamp, stamp watermark and assigning window and emit states.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we separate watermark stamping and window assignment, watermark itself could be pretty simple and having these use cases

  1. Ascending - basically proc time processing (EMIT ON PROCTIME) or row number processing (row number as a timestamp, per row emit in future). For every event in a chunk, stamp a timestamp for each of them. This seems a stateless watermark strategy (probably we still have state incase wall clock rewind / adjusted on that machine ?)
  2. BoundedOutOfOrderness - (EMIT ... WITH DELAY), observing the timestamp column for each event, maintain the max timestamp observed so far, if the out of order for an event is within delay, still assign a watermark to it, update the max observed timestamp as we go.
  3. OutOfOrdernessInWindowAndBatch - do we need that ? Since for tumble, hopping window, we know the delay automatically ? it seems a special case of 2 ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For session window, do we need a watermark ?

Copy link
Collaborator Author

@yl-lisen yl-lisen Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are more and more actions. Separating these can better maintain them.
In fact, EmitStrategy should only be used within AggregatingTransform.

@@ -273,6 +273,9 @@ struct SessionWindowParams : WindowParams
bool start_with_inclusion;
bool end_with_inclusion;

/// TODO: So far, always assign session window in aggr
bool assign_window_pushdown = true;
Copy link
Collaborator

@chenziliang chenziliang Feb 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// So far, only for session window, we evaluate the watermark and window for the events in Aggregate Transform
/// For other windows, we assigned the watermark in Watermark stamper step.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we are kind mixing window assignment and watermark assignment for tumble / hopping / global windows since this is probably more performant.

Right now, we are separating these 2 steps, which are good for maintainability moving forward.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For session window, fundamentally, it doesn't have a fixed watermark because it highly depends on inputs and the evaluation of start / end expression.

So, it seems window assignment for now all happens at Aggregate transform step ? The Watermark evaluation happens in WatermarkStamper step and for session window, we don't have watermark

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushdown_window_assignment

@@ -22,7 +24,8 @@ class AggregatingStep : public ITransformingStep
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool emit_version_,
bool emit_changelog_);
bool emit_changelog_,
Copy link
Collaborator

@chenziliang chenziliang Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool emit_changelog_ is redundant moving forward since emit strategy shall cover that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do it in a separate PR

virtual void removeBucketsImpl(Int64 watermark, const SubstreamContextPtr & substream_ctx) = 0;
virtual bool needReassignWindow() const = 0;

std::optional<size_t> window_start_col_pos;
std::optional<size_t> window_end_col_pos;

bool only_emit_finalized_windows = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these bools are basically a derive from the emit strategy enums ? It appears making the whole code path complex

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there are different finalizing path in AggregatingTransform for different emit strategy

return DB::convertToChunk(params.aggregator.spliceAndConvertUpdatesToBlock(data, buckets));
}

Chunk mergeAndSpliceAndConvertUpdatesToChunk(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need Splice in the name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergeAndConvertUpdatesToChunk is existed, it's used in global aggr.
mergeAndSpliceAndConvertUpdatesToChunk is used in window aggr.

switch (params.type)
{
case WindowType::TUMBLE:
return TumbleHelper::validateWatermarkStrategyAndEmitMode(strategy, mode, assert_cast<TumbleWindowParams &>(params));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit to use free functions compared with inheritance as we did switch and all type casts etc here


void processTimeout(Chunk & chunk);
/// \brief Emit a watermark according to the EmitMode
/// \p old_watermark_ts - saved watermark before current processing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consistently use either @brief, \brief, \param etc, \param has builtin support by CLION

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do it in a separate PR

Int64 next_timeout_emit_ts = 0;
Int64 timeout_interval = 0;
/// For periodic or timeout timer
InternalTimer periodic_timer;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we leverage Stopwatch we probably can do similar thing ?

@chenziliang
Copy link
Collaborator

chenziliang commented Feb 12, 2024

General comments

  1. We have so many different enums class / types regarding emit strategy, watermark strategy, emit on updates, emit finalized windows, bools etc and I think they can be simplified. They are so complex right now, i worried quite a lot it is way more complex than any other proton team member can maintain.
  2. Comments, we will need write lots of comments to explain what important functions do etc, how they work etc
  3. Free function vs Inheritance class / struct. It appears free function is easier to reuse, but too many free functions and switch case / if else may also cause maintenance issue.
  4. I generally agree we shall refactor watermark, and window assignment into different class etc, but this refactor is expected to happen in a separate PR.
  5. Emit behavior, it appears we still didn't emit per group by key for EMIT ON UPDATE but for that window, this is probably NOT what want in general.

For example, if we do SELECT k, count() FROM test GROUP BY k EMIT ON UPDATE, it only emits for keys with updates, for keys without any updates, don't emit anything.

For another example, if we do SELECT window_start, device, count() FROM tumble(device_metrics, 5s) GROUP BY window_start, device EMIT ON UPDATE, it is expected to emit only on specific device with updates in the current tumble window. For devices without updates in the current window, don't emit.

/// No watermark
None,

/// Watermark is generated based on each event, this is used for `EMIT AFTER WATERMARK WITHOUT DELAY`
Copy link
Collaborator

@chenziliang chenziliang Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For every event ? What's the use case ? Probably proc time processing ?

Copy link
Collaborator Author

@yl-lisen yl-lisen Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For session window, we need events timestamp is in ascending, which is required by session start predicates/end predicates


/// (Built-in): Watermark is generated based on a batch events:
/// - Allow time skew in one batch events
OutOfOrdernessInBatch,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For every batch, we generate a watermark ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused, remove it

@yl-lisen yl-lisen force-pushed the feature/issue-440-support-more-emit-modes-in-window branch from 42cb940 to ddd1a73 Compare February 12, 2024 16:07
@yl-lisen yl-lisen requested a review from chenziliang February 12, 2024 16:09

[[maybe_unused]] ALWAYS_INLINE static bool onlyEmitUpdates(EmitMode mode) noexcept
{
return static_cast<uint8_t>(mode) & EMIT_UPDATES_MASK;
Copy link
Collaborator

@chenziliang chenziliang Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's revisit this, it appears something wrong here since we are stuffing bit into an enum ?

@chenziliang chenziliang merged commit 2cea82f into develop Feb 12, 2024
6 checks passed
@yl-lisen yl-lisen mentioned this pull request Mar 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants