Skip to content

Commit

Permalink
http: Batch Events from HTTP Request and Response
Browse files Browse the repository at this point in the history
Simple implementations of NewEventsFromHTTPRequest and
NewEventsFromHTTPResponse as a step towards providing some support for
`application/cloudevents-batch+json`.

This doesn't help with batch sending of Events, but this provides some
basics of being able to process batch requests and/or responses
from standard Go HTTP handling.

This could be improved over time (See previous work on #301) as well,
but optimised for having something that works to start, rather than
implementing big design changes.

Signed-off-by: Mark Mandel <markmandel@google.com>
  • Loading branch information
markmandel committed Jan 6, 2023
1 parent 2298be0 commit e9cfad8
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 2 deletions.
6 changes: 4 additions & 2 deletions v2/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ var (
ToMessage = binding.ToMessage

// Event Creation
NewEventFromHTTPRequest = http.NewEventFromHTTPRequest
NewEventFromHTTPResponse = http.NewEventFromHTTPResponse
NewEventFromHTTPRequest = http.NewEventFromHTTPRequest
NewEventFromHTTPResponse = http.NewEventFromHTTPResponse
NewEventsFromHTTPRequest = http.NewEventsFromHTTPRequest
NewEventsFromHTTPResponse = http.NewEventsFromHTTPResponse

// HTTP Messages

Expand Down
5 changes: 5 additions & 0 deletions v2/binding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const (
EncodingEvent
// When the encoding is unknown (which means that the message is a non-event)
EncodingUnknown

// EncodingBatch is an instance of JSON Batched Events
EncodingBatch
)

func (e Encoding) String() string {
Expand All @@ -29,6 +32,8 @@ func (e Encoding) String() string {
return "structured"
case EncodingEvent:
return "event"
case EncodingBatch:
return "batch"
case EncodingUnknown:
return "unknown"
}
Expand Down
19 changes: 19 additions & 0 deletions v2/binding/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package format

import (
"encoding/json"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -41,12 +42,30 @@ func (jsonFmt) Unmarshal(b []byte, e *event.Event) error {
return json.Unmarshal(b, e)
}

// JSONBatch is the built-in "application/cloudevents-batch+json" format.
var JSONBatch = jsonBatchFmt{}

type jsonBatchFmt struct{}

func (jb jsonBatchFmt) MediaType() string {
return event.ApplicationCloudEventsBatchJSON
}

func (jb jsonBatchFmt) Marshal(e *event.Event) ([]byte, error) {
return nil, errors.New("not supported for batch events")
}

func (jb jsonBatchFmt) Unmarshal(b []byte, e *event.Event) error {
return errors.New("not supported for batch events")
}

// built-in formats
var formats map[string]Format

func init() {
formats = map[string]Format{}
Add(JSON)
Add(JSONBatch)
}

// Lookup returns the format for contentType, or nil if not found.
Expand Down
20 changes: 20 additions & 0 deletions v2/binding/to_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package binding
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -21,6 +22,9 @@ import (
// ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails
var ErrCannotConvertToEvent = errors.New("cannot convert message to event")

// ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails
var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events")

// ToEvent translates a Message with a valid Structured or Binary representation to an Event.
// This function returns the Event generated from the Message and the original encoding of the message or
// an error that points the conversion error.
Expand Down Expand Up @@ -61,6 +65,22 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo
return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder)
}

// ToEvents translates a Batch Message and corresponding Reader data to a slice of Events.
// This function returns the Events generated from the body data, or an error that points
// to the conversion issue.
func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error) {
messageEncoding := message.ReadEncoding()
if messageEncoding != EncodingBatch {
return nil, ErrCannotConvertToEvents
}

// Since Format doesn't support batch Marshalling, and we know it's structured batch json, we'll go direct to the
// json.UnMarshall(), since that is the best way to support batch operations for now.
var events []event.Event
err := json.NewDecoder(body).Decode(&events)
return events, err
}

type messageToEventBuilder event.Event

var _ StructuredWriter = (*messageToEventBuilder)(nil)
Expand Down
3 changes: 3 additions & 0 deletions v2/protocol/http/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (m *Message) ReadEncoding() binding.Encoding {
return binding.EncodingBinary
}
if m.format != nil {
if m.format == format.JSONBatch {
return binding.EncodingBatch
}
return binding.EncodingStructured
}
return binding.EncodingUnknown
Expand Down
12 changes: 12 additions & 0 deletions v2/protocol/http/utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,15 @@ func NewEventFromHTTPResponse(resp *nethttp.Response) (*event.Event, error) {
msg := NewMessageFromHttpResponse(resp)
return binding.ToEvent(context.Background(), msg)
}

// NewEventsFromHTTPRequest returns a batched set of Events from a http.Request
func NewEventsFromHTTPRequest(req *nethttp.Request) ([]event.Event, error) {
msg := NewMessageFromHttpRequest(req)
return binding.ToEvents(context.Background(), msg, msg.BodyReader)
}

// NewEventsFromHTTPResponse returns a batched set of Events from a http.Response
func NewEventsFromHTTPResponse(resp *nethttp.Response) ([]event.Event, error) {
msg := NewMessageFromHttpResponse(resp)
return binding.ToEvents(context.Background(), msg, msg.BodyReader)
}
26 changes: 26 additions & 0 deletions v2/protocol/http/utility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package http
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -89,3 +90,28 @@ func TestNewEventFromHttpResponse(t *testing.T) {
})
}
}

func TestNewEventsFromHTTPRequest(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte(`[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`)))
req.Header.Set(ContentType, event.ApplicationCloudEventsBatchJSON)

events, err := NewEventsFromHTTPRequest(req)
require.NoError(t, err)
require.Len(t, events, 1)
test.AssertEvent(t, events[0], test.IsValid())
}

func TestNewEventsFromHTTPResponse(t *testing.T) {
data := `[{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"1.0","type":"type"}]`
resp := http.Response{
Header: http.Header{
"Content-Type": {event.ApplicationCloudEventsBatchJSON},
},
Body: io.NopCloser(bytes.NewReader([]byte(data))),
ContentLength: int64(len(data)),
}
events, err := NewEventsFromHTTPResponse(&resp)
require.NoError(t, err)
require.Len(t, events, 1)
test.AssertEvent(t, events[0], test.IsValid())
}

0 comments on commit e9cfad8

Please sign in to comment.