Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

WIP - Eventing support #353

Closed
wants to merge 3 commits into from
Closed

WIP - Eventing support #353

wants to merge 3 commits into from

Conversation

jroper
Copy link
Member

@jroper jroper commented Jun 10, 2020

No description provided.

Comment on lines 34 to 42
function handleItemAdded(itemAdded, ctx) {
ctx.forward(products.UpdateCartQuantity, {
productId: itemAdded.item.productId,
userId: ctx.cloudevent.entitykey,
quantity: itemAdded.item.quantity,
});
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of a typical local projection - it simply forwards the event to another entity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice and clear semantics :)

//
// Metadata attached to a reply will only be propagated if the underlying protocol supports per reply metadata.
// For example, message brokers may support this, but gRPC does not.
cloudstate.Metadata metadata = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the qualifier can be omitted.


Not all events in the internal model will necessarily map to events in the external model. For example, a business decision might be that a users password might be the domain of the user management service and the user management service alone. When a `PasswordChanged` event is emitted from the event log, no event should then be published externally. Effectively, the `PasswordChanged` event must be filtered from the event stream.

The challenge in acknowledgement here is when should the `PasswordChanged` event be acknowledged, and how should the user function indicate this? Since no event is being output, the output of an event cannot be used to indicate that the `PasswordChanged` event should be acknowledged. Rather, it's the absence of an output event that should indicate that a `PasswordChanged` event should be acknowledged. But how do we differentiate between on output event, and slow processing of the input event? Does the Cloudstate proxy say "well, you haven't output anything for 10 seconds, so I'm going to acknowledge?". This puts a 10 second latency on the processing of all `PasswordChanged` events, so that solution doesn't work. Do we have a specific ack signal, which accompanies no event?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consequently, the filtering of the event then could be the indicator for an acknowledgement then? where "filtered out" means "this input event does not produce an output event". I'm not sure how to model that. Using metadata is perhaps awkward but assuming having a silent-acknowledgment after n seconds same so.

Could the proxy determine even before he sends an event to the user function if an event will not lead to an output event? PasswordChanged could be modelled so that the proxy can assume an acknowledgment on the successful reception by the user function. This also leads to the question of, when does the proxy know about a successful reception? An out of band ack-signal might be still better. If traffic does not count, immediately sent to the proxy or coalesced a bit later.

In general, what happens if an event is slowly processed, say >10 seconds? Is it assumed to be timed out at 10+n seconds? The same as the incoming event can be resent, described in "Problem Statement", this could lead to a retry by the proxy if the user function does not "respond" within a reasonable amount of time.

I think, without the proxy knowing that an input event for a certain user function does not lead to an output event (modelled by the event type itself), the user function might have to acknowledge the event by an out-of-band signal or a "not-to-be-published" output event sent explicitly.

Copy link
Contributor

@sleipnir sleipnir Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I may comment here I think we should have an ack signal that does not produce events but that is useful for the user's knowledge of what is happening and I think there may be ack strategies that the user can use, so regardless of the strategy used the fact is that the user you will have explicitly agreed with it and will be aware of the system's behavior. By strategy we could for example have strategies defined by windowning, batch or specific user timeouts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw... I initially wrote this document just to collect my own thoughts. I didn't finish it, and it's just a raw collection of thoughts. When I created this PR, I noticed the document was there, and thought well maybe it'll be useful to help others understand the context.

So, the solution I ended up going with was the following:

  • If all you want to do is emit 0-1 events, then you can define unary gRPC calls, and respond with a reply if you want to emit one event, or with no reply for zero. The ack signal is the same in either case - the response, either containing a reply or not.
  • If you want to emit 0-many events, then you can define a server-side streamed gRPC call, where the event is sent as the single input, and you can send 0 to many replies in gRPC response, followed by closing the stream. The close of the stream signals the acknowledgement.
  • If you define a client streamed gRPC call (either one direction or duplex), it will treat it the same as a server side streamed gRPC call, it will only send one event into the client stream, and consider termination of the output stream, or redeem of the output unary value, as the acknowledge event.
  • Finally, you can also emit zero to many events from unary calls using effects that forward to other commands with an output handler, though right now, if they fail to be sent, that won't cause a failure on the stream so the event will be acknowledged. We need to decide whether synchronous events should impact the result if they fail (currently they don't, the processor does wait for them to complete, but ignores the success/failure result), or if we need a new semantic to indicate "do this effect, and if it fails, fail the whole command".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper I agree with this line of thinking. I personally believe that explicit acking is going to be a recipe for a lot of bugs, so steering clear of that seems like a big win.

Comment on lines 34 to 42
function handleItemAdded(itemAdded, ctx) {
ctx.forward(products.UpdateCartQuantity, {
productId: itemAdded.item.productId,
userId: ctx.cloudevent.entitykey,
quantity: itemAdded.item.quantity,
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice and clear semantics :)

},
get entitykey() {
return metadata["ce-entitykey"];
},
Copy link
Member Author

@jroper jroper Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entity key actually isn't a Cloudevent property, I just introduced it here to carry our entity key. I had originally actually proposed an event key parameter for cloudevents:

cloudevents/spec#209

This ended up being renamed to partitionkey, as the idea was that it would be used for partitioning in Kafka and similar partition supporting message brokers:

cloudevents/spec#218

As such, while the partition key is almost always going to be the entity key, the partition key is not the right field to pass an entity key when what you need is the entity key, because they can differ.

All that said, sometime after I proposed the event key feature, a new field, subject, was added to cloud events:

cloudevents/spec#406

Looking at that, and also reading through the motivation and concrete examples in cloudevents/spec#112, I think that field is an appropriate field to pt the entity key in. So I'm going to delete entity key and use subject instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper Yes, "subject" seems very fitting for our purposes here.

@jroper
Copy link
Member Author

jroper commented Jun 12, 2020

Here's a demo:

https://youtu.be/lan5Awqy-xI

@sleipnir
Copy link
Contributor

Great work @jroper


However, doing this can be dangerous, and lead to deadlocks. The reason being that typically, the number of unacknowledged events must be limited because unacknowledged events require buffering to track, so that once the head of the queue gets acknowledged, the rest too can be acknowledged. This does depend on message broker though, for message brokers that track acknowledgements to each message independently, it may be fine, but for offset based message brokers like Kafka, it's a problem. Consider the case where a single entity may emit a `UserCreated` and then an `EmailChanged` event, but concurrently, many other entities emit events, so between those two events in the stream for all user events there may be tens, hundreds or thousands of events. If the number of events interleaved here is greater than the configured max outstanding events, then a deadlock will be reached, where the `UserCreated` event can't be acknowledged because the `EmailChanged` event has not yet been received, but the `EmailChanged` event is not being received because the limit of unacknowledged events has been reached, which is being blocked by the waiting `UserCreated` event acknowledgement.

For this reason, such stateful folds are best done with a persistent save point in the middle, such that the first event can be acknowledged immediately. In this case, what we are essentially doing is a filter with a side effect to a persistent store - when we receive the `UserCreated` event, we persist that we've received it, and emit no event, then when we receive the `EmailChanged` event, we load the persisted state that was caused by the `UserCreated` event, and emit the corresponding folded event.
Copy link
Contributor

@viktorklang viktorklang Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think things like this quickly devolve into CEP territory—and should probably be handled using some form of temporal query language.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorklang I remember we talked about it a lot in the past. I think it would be possible to build this on top of that implementation and maintain both approaches in the future. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sleipnir It's a tricky path to walk—power&flexibility vs performance&satefy (performance is possible due to making assumptions)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorklang Yes. I know it will not be an easy journey.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking a fair amount about this - if a particular projection can be feasibly described using a query DSL (eg, sql like, or maybe even graphql), then most likely, that projection wouldn't be defined on the event schema, but a state schema. ie, to define the projection, you would first need to define event handlers that translated the event handlers to some state object, then you could write your query DSL over the schema of that state object.

To explain, let's say a user has a list of email addresses, and you have a projection of email addresses to users. Your event model has EmailAddressAdded and EmailAddressRemoved events. I don't think there's any way to use SQL to model the users by email projection over those events. Rather, would write code that applied those events to a User(id: String, email: Set[String]) model, and then you can definitely use an SQL-like language to define the projection that User model.

Having to do this for each projection would effectively nullify the benefits of using a high level query language, since you'd still have to write code to handle the events, and in many cases, the code would be simpler if you didn't go through the intermediate state representation anyway.

Now, users have already defined handlers for their events, in the entity itself. One of the principles of CQRS is that read side (including the projections) should not read the state model of the write side - but I'm going to challenge that. Yes, there are advantages of separating the two, but it's not an all or nothing thing, it's not like the moment the projection reads the write side state, that you lose all advantages of CQRS and there's no point in doing CQRS at all. It's a trade off. There is a cost associated with reading from the write side, and there is value from doing it too. And I think, in many cases, the value far exceeds the costs - if you can define a projection simply by specifying an SQL statement on your write side state model, that has a huge amount of value. And what's the cost? Your write side state model can't be evolved without modifying your projection? That's a small price to pay. They can't be scaled independently? Typically they'll live in the same service consuming the same database, their independent scaling ability was already very limited. You can't swap out your write model for something else in a different database etc? Well, you can, but to do it you'll just need to keep the event handlers from your old write model around so that your read side projections can still use them - the result of which is that you've just delayed having to write two sets of event handlers until you actually needed two sets of event handlers, which is a good thing, don't duplicate until there's a need to.

Now there are two possible approaches to defining a projection on the write side state, they differ in which state is used to define the projection. One is that the state corresponding to the event being handled is used, the other is that the current state on the write side is used.

The first approach requires each projection to store the state itself. When an event is emitted, the current state for that projection is loaded, the event is applied to it using the write side event handlers, the projection handler is then run against the state, and then the state is persisted. This means storage of the state is duplicated for every single projection, which is a major downside of this approach. However, this approach also has advantages, if the state model does end up needing to diverge between the projection and the write side, that's easy to do, because the projection actually isn't sharing state with the write side, it's only reusing the event handler code to maintain its own state, which because it's the same code makes the state identical, but not shared, so at some point, you can effectively fork the projection from the write side, by duplicating the event handlers, and evolving them separately.

The second approach is, when an event is emitted, ignore the event itself, and instead read the current state from the write side, and apply that to the projection. There are a few advantages to this:

  • No duplication of write side state, so no wasted storage.
  • New projections can be easily bootstrapped by iterating through all (or a subset of) the current entities, rather than having to replay the entire event log from the beginning, which in some cases offers massive performance gains to adding new projections.

The disadvantages:

  • The projection is strongly coupled to the write side - to decouple will probably require rebuilding the projection.
  • The projection can temporarily lose causal consistency, since it's operating on the current state, not the state of the entity as it was after the event was applied. Note that it will still be eventually consistent, but state updates can be applied to it out of order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a really common use-case is to be able to dynamically reorder a projection (think "SORT BY lastName ASC|DESC") as well as consuming parts of a projection (OFFSET + LIMIT) etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't describe that as "reordering a projection", I don't think "order" is a property of a projection, I'd rather say some projections are built using indexes that support ordering.

On the topic of ordering and retrieving multiple results - this is where the Cloudstate model doesn't currently have an answer. All of our state models, both current and proposed, are modelled around commands on single entities - a command has an entity key, and the value for that key is loaded through the state model mechanism. This model allows Cloudstate to model the state in a very opinionated way, always passing it to the user code instead of the user code requesting it. But as soon as you get to multiple values, there's no longer an inherent way to identify those values, a single entity key doesn't cut it, nor do multiple entity keys cut it because many queries, you don't know what the entity keys are before you get the values (that's the point of projections). I don't think we can model this in any way other than having user code request it. Also for these types of queries, often you will have multiple indexes ad-hoc criteria for filtering, this is where abstracting the database will cause us problems - we'll have to create our own query language if we want to do that, and we don't want to do that. That's where I think there will be use cases for user code to speak directly to databases using that databases query language.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's where I come back to the thoughts around exposing query languages for projections, which means that we'd much cheaper be able to support different kinds of projections (graph, relational, text, etc)



## Developer experience

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This space intentionally left blank? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - we don't do developer experience.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-think please :)

let serviceName, commandName;
// We support either the grpc method, or a protobufjs method being passed
if (typeof method.path === "string") {
const r = new RegExp("^/([^/]+)/([^/]+)$").exec(method.path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are RegExps not reusable?

Copy link
Member Author

@jroper jroper Jun 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it should be pulled out into a constant, but not for performance reasons, v8 caches compiled regexes:

> console.time("1"); new RegExp("^/([^/]+)/([^/]+)$").exec("foo/bar"); console.timeEnd("1");
1: 0.108ms
undefined
> console.time("2"); new RegExp("^/([^/]+)/([^/]+)$").exec("foo/bar"); console.timeEnd("2");
2: 0.011ms
undefined

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting!

* limitations under the License.
*/

function Metadata(entries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper Should this perhaps normalize entries to avoid semantically equivalent keys but added with different casing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of keys are dependent on the transport. In HTTP, they are case insensitive. In other transports, they might not be.

My aim here has been to ensure that the user experience is smooth for as wide a range of use cases as possible. One of the most common places to use metadata is HTTP, and HTTP header names are case insensitive. For transports where header names are case sensitive, I think it would be very, very unusual if those transports ever defined multiple header names that differed only by case. So, given this, I've made all lookups case insensitive, this satisfies at least 99% of use cases, it gives a good experience in HTTP, and for case sensitive transports, it shouldn't matter whether the lookups are case insensitive if keys almost never differ by case. Of course, in the once in the history of the universe chance that there will be keys that differ only by case, you can still iterate through the entries to differentiate between them, and you get the key in its original case.

For setting metadata - I preserve the case passed. Plus, multiple values for a single case are supported here, so this always adds the value in addition to old values, there's no replace operation. If you want to replace a key, you'll have to delete followed by add. Maintaining case when setting ensures that things work both in the case sensitive and case insensitive scenarios. It's also how most HTTP/1.1 clients and servers generally work, even though http header names are case insensitive, they always preserve the case passed to them by users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think key is to make sure that it's difficult to mess things up, and if things get messed up, it's easy to diagnose and remedy. Having effectively a "bag"-semantics I think is rather error prone, wouldn't you agree?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's technically not bag semantics because insertion order is maintained.

But yes, I agree it is error prone - but if you don't like that, the person to take it up with is Tim Berners-Lee. The error proneness doesn't come from what we're providing, it is inherited from what we are supporting. We're supporting HTTP as a transport protocol. HTTP headers have bag-like semantics, which is error prone, supporting HTTP headers means inheriting those error prone semantics. If we tried to fix it, the result would be a metadata structure where certain valid HTTP messages couldn't be expressed. And given that metadata is an escape hatch into the transport protocol, it wouldn't make sense to me if the escape hatch only got you half way out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talking to Tim, brb.

Comment on lines +26 to +27
int32 total_quantities = 1;
int32 total_carts = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this perhaps be uint32?

Copy link
Member Author

@jroper jroper Jun 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, never use uint32 or uint64 in APIs. Google recommends strongly against using them. The reason for this, is that they can overflow in languages that don't support unsigned integer types, like Java, where uint32 gets generated as an int and uint64 gets generated as long, so uint has the potential to cause compatibility issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, thanks.

Comment on lines +14 to +18
option (.cloudstate.eventing) = {
in: {
event_log: "shopping-cart"
}
};
Copy link
Contributor

@viktorklang viktorklang Jun 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be option (.cloudstate.eventing).in.event_log = "shopping-cart"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think so.

@@ -26,5 +26,5 @@ option java_package = "io.cloudstate";
option go_package = "github.com/cloudstateio/go-support/cloudstate/;cloudstate";

extend google.protobuf.FieldOptions {
bool entity_key = 50002;
bool entity_key = 1080;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a comment here that this is a registered option?


message EventDestination {
oneof destination {
string topic = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a oneof due to compatibility concerns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for symmetry with EventSource, which has a oneof too. Just as there are currently multiple different types of event sources, we may have multiple different types of event destinations in future, not just topics. By making this a oneof, anything that reads this knows that if they want to be forwards compatible, they should know that topic will not always be set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

}

extend google.protobuf.MethodOptions {
Eventing eventing = 50003;
Eventing eventing = 1081;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps document/comment that this is a registered number?

option go_package = "github.com/cloudstateio/go-support/cloudstate/;cloudstate";

extend google.protobuf.FieldOptions {
bool legacy_entity_key = 50002;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a comment on when this is likely removable?

repeated MetadataEntry entries = 1;
}

message MetadataEntry {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document as to what the behavior/semantics of this is? Are duplicate keys allowed? What happens if they are? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior/semantics of metadata are undefined at the Cloudstate protocol level, this is for passing transport specific metadata, and so correspondingly the semantics are 100% dependent on whichever transport this metadata has come from or is going to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added detailed docs to the spec on this now.

@@ -45,17 +56,67 @@ message FunctionReply {
Forward forward = 3;
}

repeated SideEffect side_effects = 4;
repeated SideEffect side_effects = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Not sure.

@jroper
Copy link
Member Author

jroper commented Oct 15, 2020

I'm going to look at reviving this, but in another PR.

@jroper jroper closed this Oct 15, 2020
@jroper jroper mentioned this pull request Oct 16, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants