Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CloudEvents] CloudEvents for event bus events #77

Closed
10 tasks done
robrap opened this issue Jul 12, 2022 · 30 comments
Closed
10 tasks done

[CloudEvents] CloudEvents for event bus events #77

robrap opened this issue Jul 12, 2022 · 30 comments
Assignees
Labels
event-bus Work related to the Event Bus.

Comments

@robrap
Copy link
Contributor

robrap commented Jul 12, 2022

Acceptance Criteria:

  • Implement and document (code comment or ADR referring to OEP) final CloudEvent headers.
    • ce_datacontenttype (content-type?)
    • ce_id
    • ce_source (don't include workers vs webapp if challenging)
    • sourcehost (extension)
    • ce_specversion
    • ce_type (Done)
    • minorversion (extension) (Deferred)
    • ce_time (Deferred)
  • Add logging of headers for producer and consumer error logging.

Notes:

From https://github.com/openedx/openedx-events/blob/main/docs/decisions/0005-external-event-schema-format.rst:

OEP-41 Asynchronous Server Event Message Format also dictates the use of the CloudEvents specification. Combined with this ADR, we would be required to adhere to the CloudEvents Avro Format. There may also be additional CloudEvent related work tied to a particular protocol binding, like the Kafka Protocol Binding for CloudEvents. This, however, is out of scope of this particular decision.

Our current approach has been to generally follow the CloudEvents spec as we understand it where and when we need to make a choice. However, we are not putting attention to ensuring that we have a complete or purely correct implementation. It is unclear how to do so, and what the benefits would be of such an effort at this time.

For current references, see:

The purpose of this issue is to communicate the current approach. If someone wants to put in more effort to review/implement any gaps around this, that help is welcome.

FYI: @ormsbee: Since you were the author of the OEP. Note: if you have no issues with this, we can close this issue.

@robrap robrap added the event-bus Work related to the Event Bus. label Jul 12, 2022
@robrap robrap added this to the [Event Bus] Future milestone Jul 12, 2022
@robrap robrap added the backlog To be put on a team's backlog or wishlist label Jul 12, 2022
@robrap robrap moved this to Todo in Arch-BOM Jul 12, 2022
@robrap robrap added this to Arch-BOM Jul 12, 2022
@ormsbee
Copy link

ormsbee commented Jul 13, 2022

Right, I proposed using CloudEvents because there was no sense in reinventing the wheel, but I don't know what validation would be necessary to ensure a fully correct implementation. I don't think it's the most pressing concern–validation would be nice if it's cheap to implement, but I wouldn't prioritize it over the other work needed to get a functioning system solving real use cases end-to-end.

This is based on my assumption that the spec is small and flexible enough that it won't require overly invasive changes even if we do end up having to make some fixes to our output later.

@feanil
Copy link

feanil commented Jul 13, 2022

I generally agree with both of you here. Follow CloudEvents as best as we can but given that we'll have our own well defined schema validators for each event type, which are more precise, we should use those where possible to do validation instead.

@robrap
Copy link
Contributor Author

robrap commented Jul 20, 2022

Thank @ormsbee and @feanil. To add some additional clarity, here are some required attributes according to the spec: https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#required-attributes. I am guessing we don't have them all, and our team is prioritizing other work over reading up on this, discussing how this should be implemented, etc. This is an inform in case anyone else wants to take up this work.

@ormsbee
Copy link

ormsbee commented Jul 20, 2022

FWIW, there are four required fields listed there (id, source, specversion, type), all of which have behaviors defined for them in the fields section of OEP-41.

@robrap
Copy link
Contributor Author

robrap commented Jul 20, 2022

Thanks for the reminder @ormsbee. I'm sure I've read those docs and forgotten about them just as many times. I appreciate that you have defined some of these for us. I can imagine implementing at least some of the missing fields once we work on observability.

In my defense, there is OEP-41, the CloudEvent spec, CloudEvent for Avro spec, and the CloudEvent for Kafka spec, and it was easy to get lost among these in the past. Sorry I forgot about the OEP-41 interpretations.

@feanil
Copy link

feanil commented Jul 28, 2022

Given that OEP-41 already provides good starting values, some of which can be easily generated and others of which will only make it easier to resolve issues, I recommend spending some time-boxed effort to just add these in early rather than defer them to later.

@robrap
Copy link
Contributor Author

robrap commented Jul 28, 2022

We'll look into what is clearly defined and easy to implement as part of the observability work.

Originally, I thought the CloudEvent attributes belonged in the data. Then, after reading the kafka-protocol-binding for CloudEvents doc, I thought this data lived in the header with special prefixes. Now, I think that they belong in the data, but that there are special keys where this data is duplicated in the header? I'm still not clear.

@robrap robrap changed the title CloudEvents for event bus events [Observability] CloudEvents for event bus events Aug 9, 2022
@robrap robrap changed the title [Observability] CloudEvents for event bus events [CloudEvents] CloudEvents for event bus events Nov 15, 2022
@robrap
Copy link
Contributor Author

robrap commented Nov 15, 2022

Additional notes:

  • When someone gets to this, we may want different tasks for different attributes. Some may be simpler than others.
  • I think the timestamp was documented as needing to match the date modified timestamp of the db record, if there is one, which may take a little plumbing and thought? This is an example of something that might get split out.
  • I still have the data vs header vs both question from this comment: [CloudEvents] CloudEvents for event bus events #77 (comment)

@robrap
Copy link
Contributor Author

robrap commented Nov 17, 2022

@ormsbee: The OEP has the following text:

Producers MUST ensure that source + id is unique for each distinct event.

The id is supposed to to be UUID. I think I'm missing something about what this statement means.

UPDATE: Oops. This is from the CloudEvents spec, and not from the OEP. Dave - you may not have the answer in this case, but maybe you understand this?

@robrap
Copy link
Contributor Author

robrap commented Nov 17, 2022

More notes/questions:

  • The OEP has more required fields than the CloudEvents spec. See https://open-edx-proposals.readthedocs.io/en/latest/architectural-decisions/oep-0041-arch-async-server-event-messaging.html#fields
  • The time field is more complicated if we want it to reflect the DB date modified date. Can we break this out, or just default to now() to start?
  • If we determine that these fields are meant to be in the event itself, in addition to the header, can we skip that? The header is much simpler to manipulate. The event data requires changing the Avro Schema. Can anyone actually determine where this data is meant to live based on the earlier comments?
  • Can we ensure that the id, and any other details, make it to the error logs?

@robrap robrap removed the backlog To be put on a team's backlog or wishlist label Nov 17, 2022
@ormsbee
Copy link

ormsbee commented Nov 17, 2022

@robrap:

@ormsbee: The OEP has the following text:

Producers MUST ensure that source + id is unique for each distinct event.

The id is supposed to to be UUID. I think I'm missing something about what this statement means.

The CloudEvents spec doesn't strictly require that id be a UUID. The only CloudEvent requirement is that it is not empty and it is unique within the scope of the producer (the source field). Its main purpose is to be able to detect duplicate messages so that they're handled properly.

So for CloudEvents, it's legal for one message to have {'source': '/openedx/studio/web', 'id': '100'} and another to have {'source': '/openedx/lms/web', 'id': '100'}. Those are valid IDs and they should be treated as separate messages (not duplicates). In practice though, it's really common to use UUIDs, which is what OEP-41 prescribes for our usage of CloudEvents. In that case, we get source-relative uniqueness for free, since we expect every UUID to be globally unique anyway.

I tried to talk about this a bit in the id and source field descriptions in the OEP.

@robrap
Copy link
Contributor Author

robrap commented Nov 17, 2022

The CloudEvents spec doesn't strictly require that id be a UUID.

Thanks for clarifying @ormsbee. I didn't notice that UUID was under an "Examples" section in the CloudEvents spec, and I didn't realize that it was the OEP that decided on a UUID (which was in my memory about the field).

@ormsbee
Copy link

ormsbee commented Nov 17, 2022

The OEP has more required fields than the CloudEvents spec. See https://open-edx-proposals.readthedocs.io/en/latest/architectural-decisions/oep-0041-arch-async-server-event-messaging.html#fields

Yeah–I thought minorversion and sourcehost would be useful for debugging.


The time field is more complicated if we want it to reflect the DB date modified date. Can we break this out, or just default to now() to start?

The CloudEvents spec for time states:

Description: Timestamp of when the occurrence happened. If the time of the occurrence cannot be determined then this attribute MAY be set to some other time (such as the current time) by the CloudEvents producer, however all producers for the same source MUST be consistent in this respect. In other words, either they all use the actual time of the occurrence or they all use the same algorithm to determine the value used.

So to be spec-compliant, my sense is that we have three broad paths:

  1. Drop time entirely for now. It's an optional field in CloudEvents, though I get the sense that it is very common.
  2. Force clients to explicitly set the time field (no auto-fallback behavior).
  3. Separate the notion of "time of message for event" from the notion of "time of the actual event", and if your event is tied to some database change, it's your responsibility to add it to the data portion of the payload.

Forcing the clients to choose an explicit time is a pain, but I put it that way in the OEP because I thought it would be the least painful thing in the long term. I have a few areas of concern:

Backfill scenarios

Letting the producer automatically fill in the time of the event could cause garbage data to be emitted when running backfills. For instance, if there were a service listening for CourseOverview-related metadata updates, and we decided to bootstrap it by force emitting a bunch of "update" events. Or if we need to re-emit some set of events from the past week because of some bug or other.

People equating time received with time of event

If we remove the time field entirely, I'd be concerned that people would just add their own timestamp based on when the message was received. That would almost work most of the time, but would become inaccurate under heavy load or in the event of a backfill scenario, slowly corrupting our data with things that sorta-kinda-look right, but aren't.

Fuzzy data inaccuracies over time

My long term concern is that we get messy data that looks plausible but is just slightly different, sprinkled inconsistently across random parts of the codebase, making reconciliation a pain. Did that enrollment really happen during the database time or the event emission time? Those times are a second apart and that goes over a date boundary for this small set of enrollments, so which month is the revenue recognized in? These four events use the real database time, but those two others use producer time, so you can't trust them for reporting... etc...

I'm not as familiar with the data pipeline as @bmtcril and others who have been on the data eng team, but my sense is that this sort of thing has been a pain point, and I'd like to address those issues early on. I'm in favor of forcing callers to explicitly set the time to make sure it actually reflects the time the event happened and matches the database records when appropriate.


If we determine that these fields are meant to be in the event itself, in addition to the header, can we skip that? The header is much simpler to manipulate. The event data requires changing the Avro Schema. Can anyone actually determine where this data is meant to live based on the earlier comments?

You mean, can we remove it from the data portion of the event when it goes over the wire if it needs to go into a header anyway? I think so. In general, I think it's preferable to do that. Are there examples other than time where you're thinking of doing this?

Can we ensure that the id, and any other details, make it to the error logs?

Sure, though I'm not sure exactly what you're asking here. Are you thinking that the message-listening part of a service automatically emits an INFO-level log message on each incoming message so that we can see what the following log messages are responding to?

@robrap
Copy link
Contributor Author

robrap commented Nov 18, 2022

Quick message to say "Thank you @ormsbee!" and I owe a response and haven't gotten to it yet.

Note: If someone wants to groom/pick this up and I haven't responded, please ask. Thanks.

@robrap
Copy link
Contributor Author

robrap commented Nov 22, 2022

@ormsbee: [inform] Our team has other urgent priorities and will be taking a (hopefully) temporary break from event bus work for a while. I'm trying to determine if there are quick wins here we can complete now, and leave larger efforts until later (i.e. moved to new tickets).

Here are the fields defined in OEP-41:

  • datacontenttype
  • id
  • minorversion (extension)
  • source
  • sourcehost (extension)
  • specversion
  • type
  • time

Noting here some potential updates to OEP-41:

Regarding Avro:

  • We tried to be conformant with the types, which is probably the most import part.
  • I'm unclear if we are missing anything, but I think we are all set on this.

Regarding Kafka Protocol:

  • Discovery would be needed for KeyMapping compliance.
  • Some discovery needed for content type compliance. Example questions:
    • Is content-type supposed to be a standard Kafka header? The CloudEvent docs point to Kafka docs, but I can't find this header.
    • I also don't see it in the Confluent Cloud UI, but maybe some some standard Kafka headers are not displayed for some reason?
    • It is unclear whether ce_datacontenttype is even asked for, because it isn't in the sample event.
    • Note: I think this value should be "application/avro" for our Avro events.
  • I finally realized that the spec defines Binary Content Mode (what we should be using right now), and Structured Content Mode (which is not what we are using).
    • For Binary Content Mode, the CloudEvent attributes are just in the header, using the ce_ prefix.
    • My earlier confusion around when to use the header or the data for these attributes, was when I was looking at the "Structured Content Mode" example and didn't understand that it doesn't apply for our binary formatted events.
  • There might be other required tidbits buried in this doc.

Here is a proposal:

  • ce_datacontenttype
    • Implement now. Use "application/avro".
    • I'm not clear on whether this is required, but shouldn't hurt?
    • I'm not clear if we should also be adding a "content-type" header with the same value? Implementation for this would be quick, but discovery on the right thing to do here might warrant deferral?
  • ce_id
    • Implement now. This should be quick.
  • minorversion (extension)
  • ce_source
    • Defer. Needs some thought. How do we provide different config for web vs workers?
  • sourcehost (extension)
    • Defer, unless we can grab some existing simple host implementation (IP or whatever), and we can always iterate and improve later.
  • ce_specversion
    • Implement now. Easy.
  • ce_type
    • This is done!
  • ce_time
    • Defer. From the above discussion, I think it makes to require the producer to pass in this time. However, this requires adding the capability of passing this in, and it requires updating our event code to get the modified date to pass through, and our current use case couldn't use this data anyway, because the consumer wouldn't be writing this modified date as-is for later comparison. See below for a potential alternative short-term solution.

Questions to be addressed now:

  • Should we drop the ce_ prefix for the "extension" fields? Are these not actually CloudEvent headers? Do we want our own prefix, or just add them as-is? Note - this is only important to answer if we implement "sourcehost" at this time.
  • Should we consider introducing a new and different field named something like produced_time (or some such name), that has a more consistent definition of "time of message for event"? This would be very quick and simple to implement, and gives us something now, and we can always implement ce_time later.

Can we ensure that the id, and any other details, make it to the error logs?

Sorry. This wasn't meant to be a question. I just meant that we should update our logging to include the headers.

@ormsbee
Copy link

ormsbee commented Nov 22, 2022

Quick note before I give a fuller response: the links from the OEP to the spec are broken because they moved the spec and I only pointed to the doc in the branch. But here's the latest, as far as I know: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

@robrap
Copy link
Contributor Author

robrap commented Nov 22, 2022

Quick note before I give a fuller response: the links from the OEP to the spec are broken because they moved the spec and I only pointed to the doc in the branch. But here's the latest, as far as I know: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

Thanks. This was more just an FYI about some upkeep on the OEP that anyone of us could make when we have the time.

@ormsbee
Copy link

ormsbee commented Nov 22, 2022

  • ce_datacontenttype
    • Implement now. Use "application/avro".
    • I'm not clear on whether this is required, but shouldn't hurt?
    • I'm not clear if we should also be adding a "content-type" header with the same value? Implementation for this would be quick, but discovery on the right thing to do here might warrant deferral?

This is meant to describe the content type of the thing in the data attribute, i.e. the actual event payload, not the data envelope it's packaged in. I was going under the assumption that it would be application/json.

  • ce_source
    • Defer. Needs some thought. How do we provide different config for web vs workers?

Okay. My inclination would be to make it an explicit Django setting for now rather than do anything particularly clever.

  • sourcehost (extension)
    • Defer, unless we can grab some existing simple host implementation (IP or whatever), and we can always iterate and improve later.

I think we could call socket.gethostname() and call it a day for this one. Though if the naming causes trouble, this is easily punted on.


With respect to the time thing, if we don't have time to do what we think is the right thing, let's just drop it entirely for now.

@robrap
Copy link
Contributor Author

robrap commented Nov 22, 2022

  • ce_datacontenttype

    • Implement now. Use "application/avro".
    • I'm not clear on whether this is required, but shouldn't hurt?
    • I'm not clear if we should also be adding a "content-type" header with the same value? Implementation for this would be quick, but discovery on the right thing to do here might warrant deferral?

This is meant to describe the content type of the thing in the data attribute, i.e. the actual event payload, not the data envelope it's packaged in. I was going under the assumption that it would be application/json.

Maybe I'm just reading the Avro and Kafka CloudEvent specs wrong? In any case, I know I'm not clear. Also, we are currently using binary (which I know is not in OEP-42), so I don't think application/json is what we want. But, maybe you are saying that you were assuming we would be using json and thus would use application/json?

  • ce_source

    • Defer. Needs some thought. How do we provide different config for web vs workers?

Okay. My inclination would be to make it an explicit Django setting for now rather than do anything particularly clever.

I thought settings were shared across both, but hopefully I'm missing something.

I think we could call socket.gethostname() and call it a day for this one. Though if the naming causes trouble, this is easily punted on.

Sounds simple enough.

With respect to the time thing, if we don't have time to do what we think is the right thing, let's just drop it entirely for now.

I like the idea of a new extension field that always means the time the event was produced, but that's just a thought. Even this idea can be punted.

@ormsbee
Copy link

ormsbee commented Nov 27, 2022

Maybe I'm just reading the Avro and Kafka CloudEvent specs wrong? In any case, I know I'm not clear. Also, we are currently using binary (which I know is not in OEP-42), so I don't think application/json is what we want. But, maybe you are saying that you were assuming we would be using json and thus would use application/json?

Ah, sorry, I misunderstood what Binary Content Mode implied. I thought it would work like the Structured Content Mode example, where the content-type is for the envelope, but there's an encapsulated CloudEvent tucked inside. When we were talking earlier about taking things from the event and moving them into the header, I thought that meant moving things from the data portion to the CloudEvent metadata fields, and not necessarily to the Kafka headers. You're right–we should be using application/avro.

I thought settings were shared across both, but hopefully I'm missing something.

Ah, right. Maybe we could inspect celery.task? But FWIW, I think it's okay if we ignore the distinction for now and just send a top level one like "lms" based on config.

@robrap
Copy link
Contributor Author

robrap commented Nov 28, 2022

Ah, right. Maybe we could inspect celery.task? But FWIW, I think it's okay if we ignore the distinction for now and just send a top level one like "lms" based on config.

  1. It looks like some of these are already implemented in the Signal, but it is using now() for time. See: [CloudEvents] CloudEvents for event bus events #77 (comment)
  2. The source is using web, but maybe we'll want to fix that if it isn't actually accurate.

@robrap robrap moved this from Todo to Groomed in Arch-BOM Nov 28, 2022
@rgraber rgraber moved this from Groomed to In Progress in Arch-BOM Nov 28, 2022
@rgraber rgraber self-assigned this Nov 28, 2022
@rgraber
Copy link
Contributor

rgraber commented Nov 29, 2022

Quick note on implementation:
Until we actually implement https://openedx-events.readthedocs.io/en/latest/decisions/0004-external-event-bus-and-django-signal-events.html we're going to have to update the api for the producer.send() method to take all the signal metadata. As long as producer.send() is called from within a signal listener on the producing service we should still have all the information we need in the kwargs.
Eventually, we hope to fire send() from within a listener in the event-bus library itself so the producing service won't need to provide the metadata separately.

Ex of what we'll need to do for now:
In the calling service:

@receiver(MY_SIGNAL)
def send_to_event_bus(sender, signal, **kwargs):
     openedx_events.get_producer().send(MY_SIGNAL, topic=my-topic, event_key_field=whatever, event_data={something: kwargs['something']}, event_metadata=kwargs['metadata'])

@rgraber
Copy link
Contributor

rgraber commented Nov 29, 2022

@robrap
Picky question: All the examples I see for specversion are of the form "1.0". The only way I know of we have to get the major version is from the end of the event_type, where it's in the form "v1." Do we care enough to convert? Theoretically it's a simple regex but anytime I have to parse anything with a regex I feel the need to step back and check.

@robrap
Copy link
Contributor Author

robrap commented Nov 29, 2022

Picky question: All the examples I see for specversion are of the form "1.0". The only way I know of we have to get the major version is from the end of the event_type, where it's in the form "v1." Do we care enough to convert? Theoretically it's a simple regex but anytime I have to parse anything with a regex I feel the need to step back and check.

  1. Start with the OEP for each field. The specversion should be the CloudEvent version, which is static. See https://open-edx-proposals.readthedocs.io/en/latest/architectural-decisions/oep-0041-arch-async-server-event-messaging.html#id4
  2. We are not implementing major or minor version at this time. See the issue description for the list of fields.

@robrap
Copy link
Contributor Author

robrap commented Nov 30, 2022

@rgraber: On the logging PR for the consumer, how would you feel about adding a set_custom_attribute("message_id", <HEADER ID>) as early as possible after grabbing the header? I know this is separate, but hopefully is simple enough. If you want me to break this out into a separate ticket/PR, just let me know.

@rgraber
Copy link
Contributor

rgraber commented Dec 1, 2022

Which PR? This guy openedx/event-bus-kafka#87 ?

@robrap
Copy link
Contributor Author

robrap commented Dec 1, 2022

@rgraber: I decided to create a different ticket for this. See edx/edx-arch-experiments#121. It probably makes sense to do this separately.

@rgraber
Copy link
Contributor

rgraber commented Dec 2, 2022

Closing this issue as all the headers specified in the Acceptance Criteria are now present.

@robrap
Copy link
Contributor Author

robrap commented Dec 23, 2022

I'm not sure if this was already noted above, but the metadata of the signal is already coded to have a "time" field that uses now(). See

object.__setattr__(self, "time", datetime.utcnow())

@robrap
Copy link
Contributor Author

robrap commented Dec 23, 2022

Note that Kafka has a timestamp tuple on its messages, that tells the timestamp and how it was defined (source time, or broker receive time. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
event-bus Work related to the Event Bus.
Projects
None yet
Development

No branches or pull requests

4 participants