diff --git a/v2/alias.go b/v2/alias.go index ed64b4c0c..47dad6a55 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -135,8 +135,10 @@ var ( ToMessage = binding.ToMessage // Event Creation - NewEventFromHTTPRequest = http.NewEventFromHTTPRequest - NewEventFromHTTPResponse = http.NewEventFromHTTPResponse + NewEventFromHTTPRequest = http.NewEventFromHTTPRequest + NewEventFromHTTPResponse = http.NewEventFromHTTPResponse + NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest + NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse // HTTP Messages diff --git a/v2/binding/encoding.go b/v2/binding/encoding.go index 16611a3d7..5070b7295 100644 --- a/v2/binding/encoding.go +++ b/v2/binding/encoding.go @@ -19,6 +19,9 @@ const ( EncodingEvent // When the encoding is unknown (which means that the message is a non-event) EncodingUnknown + + // EncodingBatch is an instance of JSON Batched Events + EncodingBatch ) func (e Encoding) String() string { @@ -29,6 +32,8 @@ func (e Encoding) String() string { return "structured" case EncodingEvent: return "event" + case EncodingBatch: + return "batch" case EncodingUnknown: return "unknown" } diff --git a/v2/binding/format/format.go b/v2/binding/format/format.go index 2d840025e..4db6db948 100644 --- a/v2/binding/format/format.go +++ b/v2/binding/format/format.go @@ -7,6 +7,7 @@ package format import ( "encoding/json" + "errors" "fmt" "strings" @@ -41,12 +42,30 @@ func (jsonFmt) Unmarshal(b []byte, e *event.Event) error { return json.Unmarshal(b, e) } +// JSONBatch is the built-in "application/cloudevents-batch+json" format. +var JSONBatch = jsonBatchFmt{} + +type jsonBatchFmt struct{} + +func (jb jsonBatchFmt) MediaType() string { + return event.ApplicationCloudEventsBatchJSON +} + +func (jb jsonBatchFmt) Marshal(e *event.Event) ([]byte, error) { + return nil, errors.New("not supported for batch events") +} + +func (jb jsonBatchFmt) Unmarshal(b []byte, e *event.Event) error { + return errors.New("not supported for batch events") +} + // built-in formats var formats map[string]Format func init() { formats = map[string]Format{} Add(JSON) + Add(JSONBatch) } // Lookup returns the format for contentType, or nil if not found. diff --git a/v2/binding/to_event.go b/v2/binding/to_event.go index 339a7833c..e6cfaacd2 100644 --- a/v2/binding/to_event.go +++ b/v2/binding/to_event.go @@ -8,6 +8,7 @@ package binding import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -21,6 +22,9 @@ import ( // ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails var ErrCannotConvertToEvent = errors.New("cannot convert message to event") +// ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails +var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events") + // ToEvent translates a Message with a valid Structured or Binary representation to an Event. // This function returns the Event generated from the Message and the original encoding of the message or // an error that points the conversion error. @@ -61,6 +65,22 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder) } +// ToEvents translates a Batch Message and corresponding Reader data to a slice of Events. +// This function returns the Events generated from the body data, or an error that points +// to the conversion issue. +func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error) { + messageEncoding := message.ReadEncoding() + if messageEncoding != EncodingBatch { + return nil, ErrCannotConvertToEvents + } + + // Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the + // json.UnMarshall(), since that is the best way to support batch operations for now. + var events []event.Event + err := json.NewDecoder(body).Decode(&events) + return events, err +} + type messageToEventBuilder event.Event var _ StructuredWriter = (*messageToEventBuilder)(nil) diff --git a/v2/protocol/http/message.go b/v2/protocol/http/message.go index e7e51d034..7a7c36f9b 100644 --- a/v2/protocol/http/message.go +++ b/v2/protocol/http/message.go @@ -92,6 +92,9 @@ func (m *Message) ReadEncoding() binding.Encoding { return binding.EncodingBinary } if m.format != nil { + if m.format == format.JSONBatch { + return binding.EncodingBatch + } return binding.EncodingStructured } return binding.EncodingUnknown diff --git a/v2/protocol/http/utility.go b/v2/protocol/http/utility.go index d46a33461..056ebe677 100644 --- a/v2/protocol/http/utility.go +++ b/v2/protocol/http/utility.go @@ -24,3 +24,15 @@ func NewEventFromHTTPResponse(resp *nethttp.Response) (*event.Event, error) { msg := NewMessageFromHttpResponse(resp) return binding.ToEvent(context.Background(), msg) } + +// NewEventsFromHTTPRequest returns a batched set of Events from a http.Request +func NewEventsFromHTTPRequest(req *nethttp.Request) ([]event.Event, error) { + msg := NewMessageFromHttpRequest(req) + return binding.ToEvents(context.Background(), msg, msg.BodyReader) +} + +// NewEventsFromHTTPResponse returns a batched set of Events from a http.Response +func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) { + msg := NewMessageFromHttpResponse(resp) + return binding.ToEvents(context.Background(), msg, msg.BodyReader) +} diff --git a/v2/protocol/http/utility_test.go b/v2/protocol/http/utility_test.go index 992751cb4..3869ceafd 100644 --- a/v2/protocol/http/utility_test.go +++ b/v2/protocol/http/utility_test.go @@ -8,6 +8,7 @@ package http import ( "bytes" "context" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -89,3 +90,28 @@ func TestNewEventFromHttpResponse(t *testing.T) { }) } } + +func TestNewEventsFromHTTPRequest(t *testing.T) { + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(`[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`))) + req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON) + + events, err := NewEventsFromHTTPRequest(req) + require.NoError(t, err) + require.Len(t, events, 1) + test.AssertEvent(t, events[0], test.IsValid()) +} + +func TestNewEventsFromHTTPResponse(t *testing.T) { + data := `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]` + resp := http.Response{ + Header: http.Header{ + "Content-Type": {event.ApplicationCloudEventsBatchJSON}, + }, + Body: io.NopCloser(bytes.NewReader([]byte(data))), + ContentLength: int64(len(data)), + } + events, err := NewEventsFromHTTPResponse(&resp) + require.NoError(t, err) + require.Len(t, events, 1) + test.AssertEvent(t, events[0], test.IsValid()) +}