diff --git a/v2/alias.go b/v2/alias.go index ed64b4c0c..2fbfaa9a7 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -135,8 +135,14 @@ var ( ToMessage = binding.ToMessage // Event Creation - NewEventFromHTTPRequest = http.NewEventFromHTTPRequest - NewEventFromHTTPResponse = http.NewEventFromHTTPResponse + + NewEventFromHTTPRequest = http.NewEventFromHTTPRequest + NewEventFromHTTPResponse = http.NewEventFromHTTPResponse + NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest + NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse + NewHTTPRequestFromEvent = http.NewHTTPRequestFromEvent + NewHTTPRequestFromEvents = http.NewHTTPRequestFromEvents + IsHTTPBatch = http.IsHTTPBatch // HTTP Messages diff --git a/v2/binding/encoding.go b/v2/binding/encoding.go index 16611a3d7..5070b7295 100644 --- a/v2/binding/encoding.go +++ b/v2/binding/encoding.go @@ -19,6 +19,9 @@ const ( EncodingEvent // When the encoding is unknown (which means that the message is a non-event) EncodingUnknown + + // EncodingBatch is an instance of JSON Batched Events + EncodingBatch ) func (e Encoding) String() string { @@ -29,6 +32,8 @@ func (e Encoding) String() string { return "structured" case EncodingEvent: return "event" + case EncodingBatch: + return "batch" case EncodingUnknown: return "unknown" } diff --git a/v2/binding/format/format.go b/v2/binding/format/format.go index 2d840025e..6bdd1842b 100644 --- a/v2/binding/format/format.go +++ b/v2/binding/format/format.go @@ -7,6 +7,7 @@ package format import ( "encoding/json" + "errors" "fmt" "strings" @@ -41,12 +42,33 @@ func (jsonFmt) Unmarshal(b []byte, e *event.Event) error { return json.Unmarshal(b, e) } +// JSONBatch is the built-in "application/cloudevents-batch+json" format. +var JSONBatch = jsonBatchFmt{} + +type jsonBatchFmt struct{} + +func (jb jsonBatchFmt) MediaType() string { + return event.ApplicationCloudEventsBatchJSON +} + +// Marshal will return an error for jsonBatchFmt since the Format interface doesn't support batch Marshalling, and we +// know it's structured batch json, we'll go direct to the json.UnMarshall() (see `ToEvents()`) since that is the best +// way to support batch operations for now. +func (jb jsonBatchFmt) Marshal(e *event.Event) ([]byte, error) { + return nil, errors.New("not supported for batch events") +} + +func (jb jsonBatchFmt) Unmarshal(b []byte, e *event.Event) error { + return errors.New("not supported for batch events") +} + // built-in formats var formats map[string]Format func init() { formats = map[string]Format{} Add(JSON) + Add(JSONBatch) } // Lookup returns the format for contentType, or nil if not found. diff --git a/v2/binding/to_event.go b/v2/binding/to_event.go index 339a7833c..d3332c158 100644 --- a/v2/binding/to_event.go +++ b/v2/binding/to_event.go @@ -8,6 +8,7 @@ package binding import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -21,6 +22,9 @@ import ( // ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails var ErrCannotConvertToEvent = errors.New("cannot convert message to event") +// ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails +var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events") + // ToEvent translates a Message with a valid Structured or Binary representation to an Event. // This function returns the Event generated from the Message and the original encoding of the message or // an error that points the conversion error. @@ -61,6 +65,21 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder) } +// ToEvents translates a Batch Message and corresponding Reader data to a slice of Events. +// This function returns the Events generated from the body data, or an error that points +// to the conversion issue. +func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error) { + messageEncoding := message.ReadEncoding() + if messageEncoding != EncodingBatch { + return nil, ErrCannotConvertToEvents + } + + // Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the + // json.UnMarshall(), since that is the best way to support batch operations for now. + var events []event.Event + return events, json.NewDecoder(body).Decode(&events) +} + type messageToEventBuilder event.Event var _ StructuredWriter = (*messageToEventBuilder)(nil) diff --git a/v2/binding/to_event_test.go b/v2/binding/to_event_test.go index 0a0e00c00..185744a23 100644 --- a/v2/binding/to_event_test.go +++ b/v2/binding/to_event_test.go @@ -7,8 +7,12 @@ package binding_test import ( "context" + "io" + nethttp "net/http" + "strings" "testing" + "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/stretchr/testify/require" "github.com/cloudevents/sdk-go/v2/binding" @@ -159,3 +163,49 @@ func TestToEvent_transformers_applied_once(t *testing.T) { } }) } + +func TestToEvents(t *testing.T) { + fixture := map[string]struct { + contentType string + jsn string + expected func(*testing.T, []event.Event, error) + }{ + "valid event": { + jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`, + expected: func(t *testing.T, list []event.Event, err error) { + require.NoError(t, err) + require.Len(t, list, 1) + require.Equal(t, "id", list[0].ID()) + }, + }, + "invalid event": { + jsn: `[{"data":"foo","datacontenttype":"application/json","specversion":"0.1","type":"type"}]`, + expected: func(t *testing.T, _ []event.Event, err error) { + require.ErrorContains(t, err, "specversion: unknown value") + }, + }, + "bad content type": { + jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`, + contentType: event.ApplicationJSON, + expected: func(t *testing.T, _ []event.Event, err error) { + require.ErrorContains(t, err, "cannot convert message to batched events") + }, + }, + } + + for k, v := range fixture { + t.Run(k, func(t *testing.T) { + header := nethttp.Header{} + if len(v.contentType) == 0 { + header.Set(http.ContentType, event.ApplicationCloudEventsBatchJSON) + } else { + header.Set(http.ContentType, v.contentType) + } + + buf := io.NopCloser(strings.NewReader(v.jsn)) + msg := http.NewMessage(header, buf) + list, err := binding.ToEvents(context.Background(), msg, msg.BodyReader) + v.expected(t, list, err) + }) + } +} diff --git a/v2/protocol/http/message.go b/v2/protocol/http/message.go index e7e51d034..7a7c36f9b 100644 --- a/v2/protocol/http/message.go +++ b/v2/protocol/http/message.go @@ -92,6 +92,9 @@ func (m *Message) ReadEncoding() binding.Encoding { return binding.EncodingBinary } if m.format != nil { + if m.format == format.JSONBatch { + return binding.EncodingBatch + } return binding.EncodingStructured } return binding.EncodingUnknown diff --git a/v2/protocol/http/utility.go b/v2/protocol/http/utility.go index d46a33461..350fc1cf6 100644 --- a/v2/protocol/http/utility.go +++ b/v2/protocol/http/utility.go @@ -6,7 +6,9 @@ package http import ( + "bytes" "context" + "encoding/json" nethttp "net/http" "github.com/cloudevents/sdk-go/v2/binding" @@ -24,3 +26,64 @@ func NewEventFromHTTPResponse(resp *nethttp.Response) (*event.Event, error) { msg := NewMessageFromHttpResponse(resp) return binding.ToEvent(context.Background(), msg) } + +// NewEventsFromHTTPRequest returns a batched set of Events from a HTTP Request +func NewEventsFromHTTPRequest(req *nethttp.Request) ([]event.Event, error) { + msg := NewMessageFromHttpRequest(req) + return binding.ToEvents(context.Background(), msg, msg.BodyReader) +} + +// NewEventsFromHTTPResponse returns a batched set of Events from a HTTP Response +func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) { + msg := NewMessageFromHttpResponse(resp) + return binding.ToEvents(context.Background(), msg, msg.BodyReader) +} + +// NewHTTPRequestFromEvent creates a http.Request object that can be used with any http.Client for a singular event. +// This is an HTTP POST action to the provided url. +func NewHTTPRequestFromEvent(ctx context.Context, url string, event event.Event) (*nethttp.Request, error) { + if err := event.Validate(); err != nil { + return nil, err + } + + req, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, nil) + if err != nil { + return nil, err + } + if err := WriteRequest(ctx, (*binding.EventMessage)(&event), req); err != nil { + return nil, err + } + + return req, nil +} + +// NewHTTPRequestFromEvents creates a http.Request object that can be used with any http.Client for sending +// a batched set of events. This is an HTTP POST action to the provided url. +func NewHTTPRequestFromEvents(ctx context.Context, url string, events []event.Event) (*nethttp.Request, error) { + // Sending batch events is quite straightforward, as there is only JSON format, so a simple implementation. + for _, e := range events { + if err := e.Validate(); err != nil { + return nil, err + } + } + var buffer bytes.Buffer + err := json.NewEncoder(&buffer).Encode(events) + if err != nil { + return nil, err + } + + request, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, &buffer) + if err != nil { + return nil, err + } + + request.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + + return request, nil +} + +// IsHTTPBatch returns if the current http.Request or http.Response is a batch event operation, by checking the +// header `Content-Type` value. +func IsHTTPBatch(header nethttp.Header) bool { + return header.Get(ContentType) == event.ApplicationCloudEventsBatchJSON +} diff --git a/v2/protocol/http/utility_test.go b/v2/protocol/http/utility_test.go index 992751cb4..a0583094e 100644 --- a/v2/protocol/http/utility_test.go +++ b/v2/protocol/http/utility_test.go @@ -8,16 +8,19 @@ package http import ( "bytes" "context" + "io" "io/ioutil" "net/http" "net/http/httptest" + "strings" "testing" - "github.com/stretchr/testify/require" - "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestNewEventFromHttpRequest(t *testing.T) { @@ -89,3 +92,187 @@ func TestNewEventFromHttpResponse(t *testing.T) { }) } } + +func TestNewEventsFromHTTPRequest(t *testing.T) { + + type expected struct { + len int + ids []string + errorContains string + } + + fixtures := map[string]struct { + jsn string + contentType string + expected expected + }{ + "single": { + jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`, + expected: expected{ + len: 1, + ids: []string{"id"}, + }, + }, + "triple": { + jsn: `[{"data":"foo","datacontenttype":"application/json","id":"id1","source":"source","specversion":"1.0","type":"type"},{"data":"foo","datacontenttype":"application/json","id":"id2","source":"source","specversion":"1.0","type":"type"},{"data":"foo","datacontenttype":"application/json","id":"id3","source":"source","specversion":"1.0","type":"type"}]`, + expected: expected{ + len: 3, + ids: []string{"id1", "id2", "id3"}, + }, + }, + "invalid header": { + jsn: `{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}`, + contentType: event.ApplicationJSON, + expected: expected{ + errorContains: "cannot convert message to batched events", + }, + }, + "invalid event": { + jsn: `[{"data":"foo","datacontenttype":"application/json","specversion":"0.1","type":"type"}]`, + expected: expected{ + errorContains: "specversion: unknown value", + }, + }, + } + + for k, v := range fixtures { + t.Run(k, func(t *testing.T) { + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(v.jsn))) + if len(v.contentType) == 0 { + req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + } else { + req.Header.Set(ContentType, v.contentType) + } + + events, err := NewEventsFromHTTPRequest(req) + if len(v.expected.errorContains) == 0 { + require.NoError(t, err) + require.Len(t, events, v.expected.len) + for i, e := range events { + test.AssertEvent(t, e, test.IsValid()) + require.Equal(t, v.expected.ids[i], events[i].ID()) + } + } else { + require.Error(t, err) + require.ErrorContainsf(t, err, v.expected.errorContains, "error should include message") + } + }) + } +} + +func TestNewEventsFromHTTPResponse(t *testing.T) { + data := `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]` + resp := http.Response{ + Header: http.Header{ + "Content-Type": {event.ApplicationCloudEventsBatchJSON}, + }, + Body: io.NopCloser(bytes.NewReader([]byte(data))), + ContentLength: int64(len(data)), + } + events, err := NewEventsFromHTTPResponse(&resp) + require.NoError(t, err) + require.Len(t, events, 1) + test.AssertEvent(t, events[0], test.IsValid()) +} + +func TestNewHTTPRequestFromEvent(t *testing.T) { + // echo back what we get, so we can compare events at either side. + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(ContentType, r.Header.Get(ContentType)) + // copy across structured headers + for k, v := range r.Header { + if strings.HasPrefix(k, "Ce-") { + w.Header()[k] = v + } + } + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.NoError(t, r.Body.Close()) + _, err = w.Write(b) + require.NoError(t, err) + })) + defer ts.Close() + + t.Run("valid event", func(t *testing.T) { + e := event.New() + e.SetID(uuid.New().String()) + e.SetSource("example/uri") + e.SetType("example.type") + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) + + req, err := NewHTTPRequestFromEvent(context.Background(), ts.URL, e) + require.NoError(t, err) + + resp, err := ts.Client().Do(req) + require.NoError(t, err) + + result, err := NewEventFromHTTPResponse(resp) + require.NoError(t, err) + require.Equal(t, &e, result) + }) + + t.Run("invalid event", func(t *testing.T) { + e := event.New() + _, err := NewHTTPRequestFromEvent(context.Background(), ts.URL, e) + require.ErrorContains(t, err, "id: MUST be a non-empty string") + }) +} + +func TestNewHTTPRequestFromEvents(t *testing.T) { + // echo back what we get, so we can compare events at either side. + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(ContentType, r.Header.Get(ContentType)) + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.NoError(t, r.Body.Close()) + _, err = w.Write(b) + require.NoError(t, err) + })) + defer ts.Close() + + t.Run("valid events", func(t *testing.T) { + var events []event.Event + e := event.New() + e.SetID(uuid.New().String()) + e.SetSource("example/uri") + e.SetType("example.type") + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"hello": "world"})) + events = append(events, e.Clone()) + + e.SetID(uuid.New().String()) + require.NoError(t, e.SetData(event.ApplicationJSON, map[string]string{"goodbye": "world"})) + events = append(events, e) + + require.Len(t, events, 2) + require.NotEqual(t, events[0], events[1]) + + req, err := NewHTTPRequestFromEvents(context.Background(), ts.URL, events) + require.NoError(t, err) + + resp, err := ts.Client().Do(req) + require.NoError(t, err) + + result, err := NewEventsFromHTTPResponse(resp) + require.NoError(t, err) + require.Equal(t, events, result) + }) + + t.Run("invalid events", func(t *testing.T) { + events := []event.Event{event.New()} + _, err := NewHTTPRequestFromEvents(context.Background(), ts.URL, events) + require.ErrorContains(t, err, "id: MUST be a non-empty string") + }) + +} + +func TestIsHTTPBatch(t *testing.T) { + header := http.Header{} + assert.False(t, IsHTTPBatch(header)) + + header.Set(ContentType, event.ApplicationJSON) + assert.False(t, IsHTTPBatch(header)) + + header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + assert.True(t, IsHTTPBatch(header)) +}