Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: klip-10 add suppress to KSQL #3754

Merged
merged 4 commits into from
May 22, 2020
Merged

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Nov 5, 2019

KLIP 10 - Add Suppress To KSQL

Author: agavra |
Release Target: 5.5 |
Status: In Discussion |
Discussion: link to the design discussion PR

tl;dr: There have been many requests from the community to be able to the control continuous
refinement policy of underlying KStreams applications. KTable#suppress allows such
control and should be given corresponding KSQL syntax.

## Value/Return

This feature enables functionality in KSQL that is not possible today, and provides stricter
semantic guarantees on KSQL aggregation queries.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provides stricter semantic guarantees

Why that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I meant "stricter control on the semantics of aggregations" - you could rely on certain assumptions that previously you could not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was one of the features/observations from the original KIP: when the application is taking actions in response to emitted events, then the emission behavior itself also has semantics, and the app developer needs to know what they are.

However, it's probably worth keeping our heads straight about whether we're talking about SQL/relational semantics vs. emit semantics, since they're different dimensions, and the discussion could become confusing.

And behaves in the following ways:
- If `CHANGES` is specified, then all intermediate changes will be materialized
- If `FINAL` is specified, then output will be suppressed depending on the `suppression`:
- if `EAGERLY` is specified, the suppression is a best effort attempt to reduce duplicate output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between CHANGES and FINAL EAGERLY? To me, they appear to be the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I was under the assumption that CHANGES would be "one-to-one" (e.g. something like no buffering) - I've had some people in the community ask for that feature. Is that possible with the suppress API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @vpapavas a little more - I think I've clarified my own thoughts about the difference:

  • EMIT CHANGES uses just the buffer size
  • EMIT FINAL EAGERLY would use either the buffer size or the window size, whatever comes first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we set on EMIT FINAL ? I goes back to #3242 (comment)

I am concerned about introducing a new FINAL mode here. Could we do something like adding a new SUPRESS UNTIL <supression_mechanism> clause ? (`SUPRESS

  • IMO having supress in the syntax would also make it easier for users to reason across streams and ksql. Its anyway doing a Streams supress
  • Can EAGERLY be a supression scheme instead of its own clause.. For with EAGERLY WHEN WINDOW_CLOSED() , user would be 1 record per window as long as it fits in buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the comments below that John posted - I think it helps clarify a lot of the streams behaviors that will make me need to revamp a lot of this.

Are we set on EMIT FINAL?

I think we can change FINAL to something else if we want, but I think we should stick with EMIT and not introduce SUPPRESS - I don't agree that mirroring Streams terminology is necessarily a design goal. One candidate is maybe EMIT WHEN [supression_condition].

Can EAGERLY be a suppression scheme instead of its own clause

I think I'm going to remove the concept of EAGERLY - it looks like I misunderstood how that works. Again, I will defer to John's comments below on the semantics that I misunderstood.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EMIT FINAL also fits in with the direction other streaming SQL impls are going... i.e. Beam/Flink.


We will model the suppression as method calls in order to allow flexibility in the syntax if
different types of suppressions are added in the future. We could also extend this to allow user
implemented suppressions in the same way that we support UDFs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea -- this is not supported in Kafka Streams atm, and it's unclear to me who this would work? It's not part of this proposal, but as you outline the idea I am wondering how this could look like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I assumed that anyone who implemented the Suppressed interface could plug into the interface. If that's not the intention then maybe I can simplify the syntax - but I wanted to leave it open in case that was the idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. For clarity, people don't implement Suppressed. In fact, this is expressly forbidden. Instead, they use the static methods on Suppressed to configure the operation.

Copy link
Contributor

@big-andy-coates big-andy-coates Nov 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei is there any intent to open this up in the future, or does that just not work?

@vvcephei
Copy link
Member

vvcephei commented Nov 5, 2019

Hey, thanks @agavra for taking on this design task! I think that a lot of people will get utility from this feature, and I like the way you've posed it.

Reading the design, it seems like the expression space looks like this:

expression notes
EMIT CHANGES This seems like it would just rely on whatever behavior the record cache configuration produces.
EMIT FINAL EAGERLY WHEN TIME_LIMIT(time_expression) This is more like a "rate limiting" configuration, but since Suppression has been published, this has been probably the number 2 most confusing and dubiously useful configuration. There's inherent complexity in whether you mean "stream time" or "wall clock time" when you say "TIME_LIMIT". Streams currently only supports stream time, and I'd say colloquially that approximately no one finds this useful. Rather, people keep falling into the trap of thinking that it's "wall clock time", which would be useful, but isn't currently supported.
EMIT FINAL EAGERLY WHEN WINDOW_CLOSED() This configuration actually doesn't make sense at all. You cannot both emit eagerly and also only emit when the window closes. In fact, if you try to implement it, you'll find there's no way to express it in Streams.
EMIT FINAL WHEN WINDOW_CLOSED() This is the most useful configuration for window processing and eventing use cases.
EMIT FINAL WHEN TIME_LIMIT(time_expression) This configuration actually doesn't make sense either. If the emit happens based on the time limit, then it is by definition not also the final result, right?

With all of the above in mind, and also with some of the battle scars of having people actually use Suppress in Streams, I'm wondering if we should consider an alternative arrangement:

expression notes
EMIT CHANGES Continuously emit intermediate results. This syntax leaves open the possibility to add further modifiers like EMIT CHANGES [when_expression], where when_expression is something like (just a spitball) `EVERY time_expression [IN STREAM_TIME
EMIT FINAL Continuously emit the final results for each window/key pair as after the window closes. This is only valid if the table is windowed, which presents its own validation difficulty. Well, I guess it's also "valid" for non-windowed tables, it's just that the results are never final, so the correct output is no output for all eternity, followed by the current state of each key at the end of time.

There's one extra semantic difficulty you might want to consider: it does make sense to issue a "point in time" query, but request only final results. I wonder if you want to account for that in the query syntax, or instead just build on the nature of SQL. I.e., if there's a way to define the "final results" relation (as a streaming table) and then collect it into a "point in time" query, like SELECT * FROM (SELECT ... EMIT FINAL). Food for thought.

Also, while bikeshedding the strawman above, I'm wondering if you can let us know what kind of syntax other streaming SQL dialects use for this concept. I'm not saying we should use the same syntax, but I think it would help to know what they are.

Last comment: You said that grace period would be out of scope. What would be the semantics of FINAL, then? I.e., is the grace period just zero for now? It matters because FINAL is specifically defined by the window close time, which in turn is partially determined by the grace period.

@agavra
Copy link
Contributor Author

agavra commented Nov 5, 2019

Thanks for the thoughts @vvcephei! Really helps my understanding of KStreams.

EMIT FINAL EAGERLY WHEN TIME_LIMIT(time_expression) This is more like a "rate limiting" configuration, but since Suppression has been published, this has been probably the number 2 most confusing and dubiously useful configuration. There's inherent complexity in whether you mean "stream time" or "wall clock time" when you say "TIME_LIMIT". Streams currently only supports stream time, and I'd say colloquially that approximately no one finds this useful. Rather, people keep falling into the trap of thinking that it's "wall clock time", which would be useful, but isn't currently supported.

Good to know that this hasn't been useful or well understood in practice. I think trying to support this is what introduced most of the complexity was in this KLIP.

EMIT FINAL EAGERLY WHEN WINDOW_CLOSED() This configuration actually doesn't make sense at all. You cannot both emit eagerly and also only emit when the window closes. In fact, if you try to implement it, you'll find there's no way to express it in Streams.

I just saw that untilWindowCloses takes only a StrictBufferConfig. I imagined that this would mean "try your best to emit only the final, but I prefer that you emit intermediate than hit OOM" - it would mean a hint that the downstream can handle duplicates, but does not need to see the duplicates.

EMIT FINAL WHEN TIME_LIMIT(time_expression) This configuration actually doesn't make sense either. If the emit happens based on the time limit, then it is by definition not also the final result, right?

The way I imagined this is that the WHEN expression "defines" the FINAL. e.g. I consider a record FINAL when it hits the time limit, not when the window is closed. Thinking about this, though, I can see why this might not make much sense in production use cases.


I think overall given this understanding of KS, it makes a lot of sense to just support EMIT CHANGES and EMIT FINAL for now the way that you suggest it and leave it open to the extensions if we need them later.

With regards to "point in time" queries, we've ditched that terminology in favor of a clearer taxonomy of queries. There are two types of transient user queries - those that stream results back and those that "batch" results back. I don't think EMIT CHANGES makes sense for those that batch results back (and implementing that would be impossible since it just looks at the state store directly anyway!). EMIT CHANGES/EMIT FINAL are both valid for the transient queries that stream results back (i.e. push queries) and this proposal would cover that in the same way as it does persistent queries.

I'm wondering if you can let us know what kind of syntax other streaming SQL dialects use for this concept.

https://arxiv.org/pdf/1905.12133.pdf (they use EMIT STREAM to define changelog - our equivalent of EMIT CHANGES)

Last comment: You said that grace period would be out of scope. What would be the semantics of FINAL, then? I.e., is the grace period just zero for now? It matters because FINAL is specifically defined by the window close time, which in turn is partially determined by the grace period.

The way I see it is that grace period is a property of the window. Today (I think) we use whatever the default is for the grace period and don't supply any syntax to help set it. We will use that grace period to determine what "FINAL" means. Later we could introduce syntax in the WINDOW expression to specify the grace period and this would get that "for free" if I understand correctly.

@vvcephei
Copy link
Member

vvcephei commented Nov 6, 2019

Hi @agavra , thanks for the response.

Regarding eager+final, you're not the only person to consider these semantics and reach the same conclusion, but IMO, something like "try your best to emit only the final, but I prefer that you emit intermediate than hit OOM" is the kind of thing that users ask for, but don't really want. I.e., the semantics would be very esoteric and almost certainly wrong for most use cases.

One guiding principle of the feature is that either the downstream can handle duplicates and therefore doesn't need "final results only", or it cannot handle duplicates and therefore really needs "final results only". This seems to come down to fundamental properties of the downstream system, with no grey area. Certainly, even downstreams that can handle duplicates would still be interested in caching/rate-limiting/etc to control the workload, but that's not the same thing as asking for final results only.

Regarding the paragraph that starts, "With regards to "point in time" queries...", I think maybe I wasn't clear. Using the taxonomy you clarified, I was talking about "batch results back"-type queries (I issue the query and get a single tabular result, and that's it, just like traditional RDBMS). It still makes sense to issue such a query, and only want to see "final" results for a windowed aggregation. Consider: as a human being, I want to issue an ad-hoc query to find out the answer to the example that you led with. I'm not writing an application, I just need to get an answer to this specific question right now. Still, I don't want to get false positives from incomplete data, so I wish to only query over closed windows. Further, note that I might be querying a persistent object that I didn't create (a table created by another query), so I might not have a-priori knowledge of the exact structure of the window definition and therefore am not in a good position to scope the query itself to only cover closed windows.

Thanks for the link to https://arxiv.org/pdf/1905.12133.pdf . What I actually meant was how they define the concepts analogous to EMIT FINAL and grace period. I think it would be a good idea to augment the proposal with such comparative references.

Finally, regarding grace period: This is correct, grace period is a property of the window. If KSQL is using the default from Streams, note that the default is 24 hours. This means that if you run an EMIT FINAL continuous query, you would see no output at all for a full day. This has been another painful experience from the Suppress feature. I documented this sharp edge heavily, and mentioned it in the blog post, and still, approximately everyone cuts themselves on it. And that is even with the situation that people can change the grace period, but don't know that they should. I highly recommend considering specification of the grace period to be part of this KLIP. Maybe also setting a different default, or possibly requiring it to be specified.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this @agavra and I think our users will to!

Some quick thoughts from reading through:

  1. As @vvcephei mentioned, the default grace period is 24 hours, so we'll need a way of users defining the grace period for a windowed table. This could be another KLIP, but will need doing before / at the same time, as EMIT FINAL is pretty useless without it. BTW, there's also a bug we'd be fixing: if you create a window larger than 24 hours it currently blows up. (There's a git hub issue for that somewhere).
  2. I'd with @vvcephei on keeping it simple initially. I think just looking to add EMIT FINAL initially would give most benefit. We can then look at a follow up KLIP to add rate limiting via EMIT if there is call/need e.g. EMIT AFTER [duration] [time_type]. As @vvcephei mentioned, what type of time is important and it may not make sense to implement this until we have wall-clock time.
  3. At the moment pull queries are implicitly EMIT FINAL. You'll see this in the code once you start working on this. When we add support for EMIT FINAL to our SQL syntax we should allow, (and ignore), EMIT FINAL on pull queries, i.e. this should be in scope. So SELECT * FROM X WHERE ROWKEY=x; is equivalent to SELECT * FROM X WHERE ROWKEY=x EMIT FINAL. Why? Because the user only wants the final result for the row.

Let me know once you've updated the KLIP and I'll take another look!

@mjsax
Copy link
Member

mjsax commented Nov 27, 2019

At the moment pull queries are implicitly EMIT FINAL. You'll see this in the code once you start working on this. When we add support for EMIT FINAL to our SQL syntax we should allow, (and ignore), EMIT FINAL on pull queries, i.e. this should be in scope. So SELECT * FROM X WHERE ROWKEY=x; is equivalent to SELECT * FROM X WHERE ROWKEY=x EMIT FINAL. Why? Because the user only wants the final result for the row.

Should this not be invalid syntax? EMIT FINAL only makes sense for windowed-aggregations hence, if there is not WINDOW BY clause, EMIT FINAL should not be allowed at all? Treating both queries as equivalent does not make sense to me, because EMIT FINAL is rather a property of the window/aggregation operator within the query, but not the query itself from my point of view.

@big-andy-coates
Copy link
Contributor

@mjsax

From the discussion in KLIP-8, the proposition was that for a given query, e.g.

SELECT * FROM FOO WHERE ROWKEY=x;

(Where FOO is a table thats computing an aggregate, e.g. CREATE TABLE FOO AS SELECT COUNT() FROM BAR GROUP BY ROWKEY;)

You can either request the changes of how the final result was built, using EMIT CHANGES, or only want the final result, using EMIT FINAL, with EMIT FINAL being the default if none explicitly supplied.

EMIT CHANGES would output the stream of CDC events of the history of how the count, for this key, changed over time. (Obviously, key compaction will delete history, but that's configurable).

EMIT FINAL would get the current final result for the key. (Either from a state store, or potentially from reading the source topic and computing the aggregate. Using the state store is just an optimisation).

So in this sense both EMIT CHANGES and EMIT FINAL are valid.

Of course, that's not taking into account KLIP-11, which proposing using SELECT * FROM STREAM(SELECT * FROM FOO WHERE ROWKEY=x);.

However, this klip currently stands alone, or rather sits on top of klip-8. As such the EMIT FINAL is actually implicit for pull queries. Of course, we can decide if that should continue to be the case, but I think that is better discussed in klip-11.

@mjsax
Copy link
Member

mjsax commented Nov 30, 2019

Oh, I see. There was some misunderstanding from my side what was meat by EMIT FINAL for pull queries. However, I think that there is another misunderstanding (more below).

What I am really wondering though is, how current pull queries implement "emit final". I am also wondering about this statement

EMIT FINAL would get the current final result for the key

"current" and "final" seem to contradict each other. My understanding was (maybe it's incorrect), that KSQL implements pull queries via IQ and thus, if a user does a lookup for a key (ie, a windowed-key in our example), the pull query would go against the current table state and there is no notion of "final" for this case. Kafka Streams does not provide any means to know from outside if the retention time passed: hence, how do you know that you can or cannot return the result of such a pull query, if you don't know if the result is final or not?

The only way to implement "final pull queries" (and AFAIK KSQL does not do it this way atm) would be to use suppress() and query the suppress result (instead of the result table of the window-aggregation that is the input to suppress()). However, the result KTable of suppress() is not materialized into a store -- and KS does not even allow so materialized it expliclity, because a changelog stream can only be materialized into a KV-store atm, but not a windowed store). Hence, atm only the full output changelog topic of suppress() could be scanned to process an "emit final pull" query. (Or KSQL falls back to the Processor API to upsert the result of suppress() into a windowed-store.)

Furthermore, this example actually illustrates a bigger issue of the KLIP-8 design (that KLIP-10 inherits) and also shows the misunderstanding that I mentioned above. If I understand you correctly, there are two types of queries atm:

SELECT * FROM FOO WHERE ROWKEY=x; // pull query; implicitly EMIT FINAL

SELECT * FROM FOO EMIT CHANGE; // push query
// emits a stream with all intermediate updates
// (minus record caching de-duplication) as result

However, how can one get a result STREAM, that does only contain the final result per window. One cannot say

SELECT * FROM FOO EMIT FINAL;

because this would be pull query... Note that the main purpose do suppress() is to support this last use-case. In Kafka Streams, "emit final" in suppress() does still describe a streaming/push query, not a IQ/pull query.

Given my current understanding how KSQL works, EMIT FINAL would actually need to be a "property" of the query that populates FOO:

CREATE TABLE FOO AS SELECT COUNT() FROM BAR GROUP BY ROWKEY WINDOW BY... EMIT FINAL;

Only if EMIT FINAL is specified on this query (what is not supported atm AFAIK), the result TABLE would only contain final result (because now FOO would be the output of suppress() but not the input) that could be queried with by the pull query as shown above. If there is no suppress() inserted into the CTAS for FOO, and if the pull query makes the decision about "emit final" it will be pretty difficult to implement... (basically the pull query would need to know how suppress() work, read the output changelog topic, and applies the suppress() logic to find the final result in the changelog stream; overall a pretty expensive operation).

This is what I meant by "emit final" is a property of the operator, but not the query. A windowed-aggregation can still be a CQ, but only emits a single result per window -- however, "emit final" does not make the CQ a "lookup query". However, KLIP-8/KLIP-10 propose that "emit final" is for pull queries while "emit change" is for push queries---this approach seems to fall short.

@vpapavas
Copy link
Member

vpapavas commented Dec 2, 2019

However, how can one get a result STREAM, that does only contain the final result per window. One cannot say

SELECT * FROM FOO EMIT FINAL;
because this would be pull query... Note that the main purpose do suppress() is to support this last use-case. In Kafka Streams, "emit final" in suppress() does still describe a streaming/push query, not a IQ/pull query.

I think you are not right here. EMIT FINAL is a push query. It means continuously stream results at window close.

@mjsax
Copy link
Member

mjsax commented Dec 3, 2019

I think you are not right here. EMIT FINAL is a push query. It means continuously stream results at window close.

I guess that is the current open question... What does EMIT FINAL actually mean? @big-andy-coates seems to have a different point of view to yours:

As such the EMIT FINAL is actually implicit for pull queries.

Or do I misunderstand what @big-andy-coates says?

I am confused atm who claims what... If you have an windowed aggregation persistent push query, you can emit the state store update to the result topic either continuously (ie, for every update) or once (ie, "final" via suppress()). Similarly, you can query the result table state store and want to query any intermediate window result or only final window result. Ie, it's basically 4 types of queries...

So far, KSQL only supported continuously pushing all intermediate result into the result table. For the new pull queries, it seems that they would query any intermediate window result, too (correct me if I am wrong).

It's unclear to me what EMIT FINAL is supposed to mean? And for whatever definition one suggests, how do you express the "other" queries? Would be good to get examples down for all types of persistent/push/pull queries for the different proposal. Atm, I cannot make much sense out of it.

@vvcephei
Copy link
Member

vvcephei commented Dec 3, 2019

I just thought of this again while reading over some other streaming SQL papers. Maybe something like the following matrix would clear it up?

  • [] saying nothing implies a pull query aka "give me back a tabular view of the query result, including the latest state for all windowed aggregations"
  • EMIT CHANGES - a push query, aka "give me back a stream view of the query result"
  • EMIT FINAL - a pull query aka "give me back a tabular view of the query result, only including the results for closed windows"
  • EMIT CHANGES FINAL - a push query - aka "give me back a stream view of the query result, only including the results for closed windows"

... or have I gotten myself confused?

@agavra
Copy link
Contributor Author

agavra commented Dec 3, 2019

I think we've dug ourselves into a little bit of a hole with KLIP-8, and now that we have much more experience with what we want we can define things much better. That being said, I think we should hold off this KLIP until #3799 (KLIP-11) is in a stable state (perhaps not necessarily implemented).


And now I'm going to contradict myself and offer some more thoughts 😂

I wrote this KLIP with the intention that EMIT is the keyword that indicates that a query should be a push query. EMIT CHANGES is a push query that does not use suppress() while EMIT FINAL uses suppress(). If it's not there, it uses IQ.

@vvcephei since streams doesn't really implement a "suppress IQ", I didn't think about the case that you have listed for EMIT FINAL (pull with only final results). I agree with @mjsax that this should be defined as a property of the table you are querying (i.e. defined by the query that populates that table) not by the pull query itself.

This indeed does differ from what KLIP-8 suggests, and perhaps I could have prevented lots of confusion by pointing that out.

@agavra
Copy link
Contributor Author

agavra commented Dec 6, 2019

Linking issue for reference: #1030

@mjsax
Copy link
Member

mjsax commented Dec 6, 2019

Thanks for the input @vvcephei and @agavra -- that clarifies a lot!

While also see a dependency with regard to KLIP-11, I am still wondering if both KLIPs could make progress in parallel? What is the importance of KLIP-8 with regard to timeline? Even if we might change some syntax later, the internal implementation would most like not change?

@apurvam apurvam added the design-proposal Tag KLIP Prs with this label label Jan 2, 2020
Copy link
Contributor

@derekjn derekjn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@agavra agavra merged commit 50ef576 into confluentinc:master May 22, 2020
@agavra agavra deleted the klip-10 branch May 22, 2020 17:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design-proposal Tag KLIP Prs with this label
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants