-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
issue 814 - Add binary content mode for NATS and JetStream protocols #929
Conversation
prefix = "ce-" | ||
contentTypeHeader = "content-type" | ||
// see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md | ||
applicationCloudEvent = "application/cloudevents" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why application/cloudevents+json
is not used for structured mode in the binding proposal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. I was following the proposal document. If you like, I can change this value and the proposal document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@embano1 Re-posting the question, in case it was missed:
Would you like me to change this to application/cloudevents+json
? Would you also like me to change the proposal to match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO that would make sense to clearly distinguish between the two transport modes. It is also used in the Kafka protocol implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this, those who might be using structured mode might not be setting NATS headers at all, including the content-type. I figure those using binary mode do in fact have to set NATS headers, including the required cloudEvent headers.
rebase needed |
8bf3b3c
to
640938e
Compare
5ff7c51
to
71cbafa
Compare
71cbafa
to
a5e580c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you please also fix the comment (whitespace) in https://github.com/cloudevents/sdk-go/pull/929/files#diff-901bae262fd5b49dd32933169ba6f38f313ce82311d8fc4fbb91cee7ac3d003aR39
Is this still accurate when looking at this code? // NewMessage wraps an *nats.Msg in a binding.Message.
// The returned message *can* be read several times safely
func NewMessage(msg *nats.Msg) *Message {
encoding := binding.EncodingStructured
if msg.Header != nil {
if msg.Header.Get(contentTypeHeader) == cloudevents.ApplicationCloudEventsJSON {
encoding = binding.EncodingStructured
} else if msg.Header.Get(specs.PrefixedSpecVersionName()) != "" {
encoding = binding.EncodingBinary
}
}
return &Message{Msg: msg, encoding: encoding}
} |
a5e580c
to
4555e3d
Compare
The issue is not on the receive. The issue is on the send. The sender without the context explicitly set would send a structured message before this change. Now the sender without a context set would send a binary message. This is based on the behavior set forth in the binding struct. It is easily controlled via the context. |
4555e3d
to
529861b
Compare
@embano1 , I added an integration test for nats_jetstream because there didn't seem to have any. I patterned it after the nats integration test, but added binary and structured messages. |
70c1046
to
06d6005
Compare
@stephen-totty-hpe we are not using Go |
f9ce122
to
7ba8692
Compare
7ba8692
to
a10d541
Compare
) | ||
|
||
func TestSendReceiveStructuredAndBinary(t *testing.T) { | ||
t.Skip("Need a running NATS server with the -js option. Turning on -js will break non jetstream tests.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current use of nats-streaming
implementation in our CI is deprecated. In order to not push too much work on your end here, please update the Github integration workflow by adding a separate service for nats-jetstream
and configure your tests to leverage this instance.
Skipping tests should not be an option here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@embano1 , changed the port to 4223. re-enabled the test.
Running docker locally with "docker run -p 4223:4223 nats:latest -js -p 4223"
Let me know if my github actions workflow looks correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@embano1 , I tried adding integration tests for nats_jetstream. They never existed. I was trying to be a good citizen. Running into a problem with github actions where I cannot pass arguments to the entrypoint. Seems like missing functionality in github actions. Should I remove the integration test completely from my pull request? Not sure how to move forward with integration tests still in the PR. Unless I can somehow get the service to work. Running out of ideas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switching to bitnami/nats:latest instead of nats:latest.
bitnami/nats:latest allows using environment variables instead of command line variables, which should get around the github action limitation.
b8b7052
to
93f7749
Compare
@stephen-totty-hpe looks like we're good here :) Just to confirm: are these the new integration tests you added? Can't really tell from the test names in the logs vs your |
Yes, there were no jetstream integration tests before this PR. Now you should see: --- PASS: TestSendReceiveStructuredAndBinary (0.18s) The NATS tests run the combination of: |
Yeah, I think I'm seeing that 2023-09-11T19:14:50.0342500Z === RUN TestSendReceiveStructuredAndBinary
2023-09-11T19:14:50.0343349Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured
2023-09-11T19:14:50.0344999Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured/Event{"specversion":"1.0","id":"full-event-0","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","dataschema":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbool":true,"exint":42,"exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z"}
2023-09-11T19:14:50.0347027Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured/Event{"specversion":"0.3","id":"full-event-1","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","schemaurl":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42,"exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source"}
2023-09-11T19:14:50.0348755Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured/Event{"specversion":"1.0","id":"min-event-2","source":"http://example.com/source","type":"com.example.MinEvent"}
2023-09-11T19:14:50.0349849Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured/Event{"specversion":"0.3","id":"min-event-3","source":"http://example.com/source","type":"com.example.MinEvent"}
2023-09-11T19:14:50.0350583Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured
2023-09-11T19:14:50.0352103Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured/Event{"specversion":"1.0","id":"full-event-0","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","dataschema":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42,"exstring":"exstring"}
2023-09-11T19:14:50.0354127Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured/Event{"specversion":"0.3","id":"full-event-1","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","schemaurl":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42,"exstring":"exstring"}
2023-09-11T19:14:50.0355450Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured/Event{"specversion":"1.0","id":"min-event-2","source":"http://example.com/source","type":"com.example.MinEvent"}
2023-09-11T19:14:50.0356468Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured/Event{"specversion":"0.3","id":"min-event-3","source":"http://example.com/source","type":"com.example.MinEvent"}
2023-09-11T19:14:50.0357196Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary
2023-09-11T19:14:50.0358703Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary/Event{"specversion":"1.0","id":"full-event-0","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","dataschema":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42}
2023-09-11T19:14:50.0360869Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary/Event{"specversion":"0.3","id":"full-event-1","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","schemaurl":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42,"exstring":"exstring"}
2023-09-11T19:14:50.0362491Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary/Event{"specversion":"1.0","id":"min-event-2","source":"http://example.com/source","type":"com.example.MinEvent"}
2023-09-11T19:14:50.0866915Z === RUN TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary/Event{"specversion":"0.3","id":"min-event-3","source":"http://example.com/source","type":"com.example.MinEvent"}
2023-09-11T19:14:50.0867729Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary
2023-09-11T19:14:50.0874197Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary/Event{"specversion":"1.0","id":"full-event-0","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","dataschema":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbool":true,"exint":42,"exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z"}
2023-09-11T19:14:50.0876693Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary/Event{"specversion":"0.3","id":"full-event-1","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","schemaurl":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exint":42,"exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true}
2023-09-11T19:14:50.0878084Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary/Event{"specversion":"1.0","id":"min-event-2","source":"http://example.com/source","type":"com.example.MinEvent"}
2023-09-11T19:14:50.0879085Z === RUN TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary/Event{"specversion":"0.3","id":"min-event-3","source":"http://example.com/source","type":"com.example.MinEvent"}
2023-09-11T19:14:50.0880066Z --- PASS: TestSendReceiveStructuredAndBinary (0.18s)
2023-09-11T19:14:50.0880828Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured (0.05s)
2023-09-11T19:14:50.0883489Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured/Event{"specversion":"1.0","id":"full-event-0","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","dataschema":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbool":true,"exint":42,"exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z"} (0.01s)
2023-09-11T19:14:50.0887367Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured/Event{"specversion":"0.3","id":"full-event-1","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","schemaurl":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42,"exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source"} (0.01s)
2023-09-11T19:14:50.0889370Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured/Event{"specversion":"1.0","id":"min-event-2","source":"http://example.com/source","type":"com.example.MinEvent"} (0.01s)
2023-09-11T19:14:50.0891062Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_structured/Event{"specversion":"0.3","id":"min-event-3","source":"http://example.com/source","type":"com.example.MinEvent"} (0.01s)
2023-09-11T19:14:50.0891971Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured (0.04s)
2023-09-11T19:14:50.0894597Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured/Event{"specversion":"1.0","id":"full-event-0","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","dataschema":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42,"exstring":"exstring"} (0.01s)
2023-09-11T19:14:50.0898005Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured/Event{"specversion":"0.3","id":"full-event-1","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","schemaurl":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42,"exstring":"exstring"} (0.01s)
2023-09-11T19:14:50.1146773Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured/Event{"specversion":"1.0","id":"min-event-2","source":"http://example.com/source","type":"com.example.MinEvent"} (0.01s)
2023-09-11T19:14:50.1148451Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_structured/Event{"specversion":"0.3","id":"min-event-3","source":"http://example.com/source","type":"com.example.MinEvent"} (0.01s)
2023-09-11T19:14:50.1149697Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary (0.04s)
2023-09-11T19:14:50.1152432Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary/Event{"specversion":"1.0","id":"full-event-0","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","dataschema":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42} (0.01s)
2023-09-11T19:14:50.1155622Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary/Event{"specversion":"0.3","id":"full-event-1","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","schemaurl":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true,"exint":42,"exstring":"exstring"} (0.01s)
2023-09-11T19:14:50.1157529Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary/Event{"specversion":"1.0","id":"min-event-2","source":"http://example.com/source","type":"com.example.MinEvent"} (0.01s)
2023-09-11T19:14:50.1159085Z --- PASS: TestSendReceiveStructuredAndBinary/regular_subscriber_-_binary/Event{"specversion":"0.3","id":"min-event-3","source":"http://example.com/source","type":"com.example.MinEvent"} (0.01s)
2023-09-11T19:14:50.1170014Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary (0.04s)
2023-09-11T19:14:50.1172750Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary/Event{"specversion":"1.0","id":"full-event-0","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","dataschema":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exbool":true,"exint":42,"exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z"} (0.01s)
2023-09-11T19:14:50.1175796Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary/Event{"specversion":"0.3","id":"full-event-1","source":"http://example.com/source","type":"com.example.FullEvent","subject":"topic","datacontenttype":"text/json","schemaurl":"http://example.com/schema","time":"2020-03-21T12:34:56.78Z","data":"hello","exint":42,"exstring":"exstring","exbinary":"AAECAw==","exurl":"http://example.com/source","extime":"2020-03-21T12:34:56.78Z","exbool":true} (0.01s)
2023-09-11T19:14:50.1177576Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary/Event{"specversion":"1.0","id":"min-event-2","source":"http://example.com/source","type":"com.example.MinEvent"} (0.01s)
2023-09-11T19:14:50.1179364Z --- PASS: TestSendReceiveStructuredAndBinary/queue_subscriber_-_binary/Event{"specversion":"0.3","id":"min-event-3","source":"http://example.com/source","type":"com.example.MinEvent"} (0.01s)
2023-09-11T19:14:50.1179866Z PASS
2023-09-11T19:14:50.1180126Z coverage: [no statements]
2023-09-11T19:14:50.1180676Z ok github.com/cloudevents/sdk-go/test/integration/nats_jetstream 0.216s coverage: [no statements] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work 🥳
cc/ @duglin for final 👀
Seems good to me, but I'd like @lionelvillard to give it a quick scan too. |
|
Any chance this is getting merged soon? |
Pinging @lionelvillard |
LGTM |
Signed-off-by: stephen-totty-hpe <stephen.totty@hpe.com>
93f7749
to
510b002
Compare
Merging, failing integration tests fixed in #963 |
Our team would like to use NATS jetstream with the cloud event-SDK. Currently binary mode is not implemented for the NATS jetstream protocol (#814).
This PR is an enhancement to support binary-mode in the NATS jetstream protocol.
Using a binary message writer implementing MessageMetadataWriter and BinaryWriter interfaces, the cloud event headers are passed as NATS headers. The cloud event data is pass as NATS data.
Message implements the MessageMetadataReader interface. Also the ReadBinary() method has been implemented.
One caveat:
Due to https://github.com/cloudevents/sdk-go/blob/main/v2/binding/write.go#L91, the default now would be Binary when the context is not set due to the third if statement.
Before adding this change, there was no binaryWriter defined, so the default would be Structured in the fourth if statement.
This can easily be controlled with WithForceBinary, WithForceStructured, WithSkipDirectStructuredEncoding, WithSkipDirectBinaryEncoding.
However, if no context was defined, then the default behavior is different.
I could check to see if the context is defined, but the context types are not exported in the binding package.
Let me know how to proceed if this change in behavior needs to be addressed differently.