-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http: Batch Events from HTTP Request and Response #829
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ package binding | |
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
|
@@ -21,6 +22,9 @@ import ( | |
// ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails | ||
var ErrCannotConvertToEvent = errors.New("cannot convert message to event") | ||
|
||
// ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails | ||
var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events") | ||
|
||
// ToEvent translates a Message with a valid Structured or Binary representation to an Event. | ||
// This function returns the Event generated from the Message and the original encoding of the message or | ||
// an error that points the conversion error. | ||
|
@@ -61,6 +65,22 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo | |
return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder) | ||
} | ||
|
||
// ToEvents translates a Batch Message and corresponding Reader data to a slice of Events. | ||
// This function returns the Events generated from the body data, or an error that points | ||
// to the conversion issue. | ||
func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: can we have a test with a partial invalid event in the batch request body? I would like to understand the semantics of this API, i.e. which states []event.Event can have depending on the input. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally I think it's fine to not initially supporting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I started playing with this, and as soon as that interface changes, it requires changes across lots of codebase, so I'm hesitant to make that change. Or maybe that change should be a subsequent PR for a larger architectural change?
Not sure I'm 100% sure I'm following here. Are you saying that there is a way I could get the
👍🏻 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You can do something like this:
Then return an error for all other cases. Does that make sense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 interesting. Curious - why have the system return an error at runtime, when you could do the check at compile time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You can either make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, I see 🤔 that makes sense -- we could set ourselves up so if more implementations of Batch come down the pipe, we could handle them here in this place, and expand the switch. SGTM. I'll make the change to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hit a pretty good wrinkle on making this change. I can't reference the concrete type at all within ToEvents -- because I get into a cyclic dependency issue. package github.com/cloudevents/sdk-go/v2
imports github.com/cloudevents/sdk-go/v2/binding
imports github.com/cloudevents/sdk-go/v2/protocol/http
imports github.com/cloudevents/sdk-go/v2/binding: import cycle not allowed So my suggestion - leave this as is, and then as there is a need to create more implementation on Batch processing, implement a |
||
messageEncoding := message.ReadEncoding() | ||
if messageEncoding != EncodingBatch { | ||
return nil, ErrCannotConvertToEvents | ||
} | ||
|
||
// Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the | ||
// json.UnMarshall(), since that is the best way to support batch operations for now. | ||
var events []event.Event | ||
err := json.NewDecoder(body).Decode(&events) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could directly return here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice trick. I like it 👍🏻 |
||
return events, err | ||
} | ||
|
||
type messageToEventBuilder event.Event | ||
|
||
var _ StructuredWriter = (*messageToEventBuilder)(nil) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,9 @@ | |
package http | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
nethttp "net/http" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding" | ||
|
@@ -24,3 +26,39 @@ func NewEventFromHTTPResponse(resp *nethttp.Response) (*event.Event, error) { | |
msg := NewMessageFromHttpResponse(resp) | ||
return binding.ToEvent(context.Background(), msg) | ||
} | ||
|
||
// NewEventsFromHTTPRequest returns a batched set of Events from a http.Request | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: better from a HTTP request ? |
||
func NewEventsFromHTTPRequest(req *nethttp.Request) ([]event.Event, error) { | ||
msg := NewMessageFromHttpRequest(req) | ||
return binding.ToEvents(context.Background(), msg, msg.BodyReader) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: use req.Context() here? Not sure if that provides value/advantage in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was being consistent with all the other utility functions as previously written (they all used |
||
} | ||
|
||
// NewEventsFromHTTPResponse returns a batched set of Events from a http.Response | ||
func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) { | ||
msg := NewMessageFromHttpResponse(resp) | ||
return binding.ToEvents(context.Background(), msg, msg.BodyReader) | ||
} | ||
|
||
// NewHTTPRequestFromEvents creates a http.Request object that can be used with any http.Client. | ||
func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Event) (*nethttp.Request, error) { | ||
// Sending batch events is quite straightforward, as there is only JSON format, so a simple implementation. | ||
for _, e := range events { | ||
if err := e.Validate(); err != nil { | ||
return nil, err | ||
} | ||
} | ||
var buffer bytes.Buffer | ||
err := json.NewEncoder(&buffer).Encode(events) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for my own education, here we just encode the events directly, but above for a single event we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only way to send batch is in a JSON encoded format, so there is no need for much of that logic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we don't need to worry about things like the "transformers"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 What is there to transform? From my reading of the batch spec, it's JSON in, JSON out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LOL I dunno. I just noticed that for the single event it has that transform stuff and I didn't want to miss anything. @lionelvillard any idea what this transform stuff is for and why we need it for single events but not batch? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I figure at least we don't need it for the utility functions, since none of the existing functions in The good thing is, if there was a need to add an optional list of transformers to the method signature isn't going to break. |
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
request, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, &buffer) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
request.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) | ||
|
||
return request, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,16 +8,17 @@ package http | |
import ( | ||
"bytes" | ||
"context" | ||
"io" | ||
"io/ioutil" | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding" | ||
"github.com/cloudevents/sdk-go/v2/event" | ||
"github.com/cloudevents/sdk-go/v2/test" | ||
"github.com/google/uuid" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestNewEventFromHttpRequest(t *testing.T) { | ||
|
@@ -89,3 +90,67 @@ func TestNewEventFromHttpResponse(t *testing.T) { | |
}) | ||
} | ||
} | ||
|
||
func TestNewEventsFromHTTPRequest(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a test where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for this suggestion! Ended up leading me to the code to see how easy it would be to do a Updates to tests also implemented. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please add test cases which actually assert an error? |
||
req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(`[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`))) | ||
req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) | ||
|
||
events, err := NewEventsFromHTTPRequest(req) | ||
require.NoError(t, err) | ||
require.Len(t, events, 1) | ||
test.AssertEvent(t, events[0], test.IsValid()) | ||
} | ||
|
||
func TestNewEventsFromHTTPResponse(t *testing.T) { | ||
data := `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]` | ||
resp := http.Response{ | ||
Header: http.Header{ | ||
"Content-Type": {event.ApplicationCloudEventsBatchJSON}, | ||
}, | ||
Body: io.NopCloser(bytes.NewReader([]byte(data))), | ||
ContentLength: int64(len(data)), | ||
} | ||
events, err := NewEventsFromHTTPResponse(&resp) | ||
require.NoError(t, err) | ||
require.Len(t, events, 1) | ||
test.AssertEvent(t, events[0], test.IsValid()) | ||
} | ||
|
||
func TestNewHTTPRequestFromEvents(t *testing.T) { | ||
var events []event.Event | ||
e := event.New() | ||
e.SetID(uuid.New().String()) | ||
e.SetSource("example/uri") | ||
e.SetType("example.type") | ||
require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) | ||
events = append(events, e.Clone()) | ||
|
||
e.SetID(uuid.New().String()) | ||
require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"goodbye": "world"})) | ||
events = append(events, e) | ||
|
||
require.Len(t, events, 2) | ||
require.NotEqual(t, events[0], events[1]) | ||
|
||
// echo back what we get, so we can compare events at either side. | ||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.Header().Set(ContentType, r.Header.Get(ContentType)) | ||
b, err := io.ReadAll(r.Body) | ||
require.NoError(t, err) | ||
require.NoError(t, r.Body.Close()) | ||
_, err = w.Write(b) | ||
require.NoError(t, err) | ||
})) | ||
defer ts.Close() | ||
|
||
req, err := NewHTTPRequestFromEvents(context.Background(), ts.URL, events) | ||
require.NoError(t, err) | ||
|
||
resp, err := ts.Client().Do(req) | ||
require.NoError(t, err) | ||
|
||
result, err := NewEventsFromHTTPResponse(resp) | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, events, result) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error threw me off until I saw your implementation in
to_event.go
- perhaps a short comment that this is handled by the other package?