Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Handle requests with very large payload #1772

Merged
merged 5 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/broker/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ const (
// TODO(liu-cong) configurable timeout
decoupleSinkTimeout = 30 * time.Second

// Limit for request payload in bytes (100Mb)
maxRequestBodyBytes = 100000000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if we'd rather have this as an env variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document how this number is picked? Is this the same as the pubsub message size limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have updated the description.


// EventArrivalTime is used to access the metadata stored on a
// CloudEvent to measure the time difference between when an event is
// received on a broker and before it is dispatched to the trigger function.
Expand Down Expand Up @@ -119,7 +122,6 @@ func (h *Handler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Re
response.WriteHeader(nethttp.StatusOK)
return
}

ctx := request.Context()
ctx = logging.WithLogger(ctx, h.logger)
ctx = tracing.WithLogging(ctx, trace.FromContext(ctx))
Expand All @@ -129,6 +131,11 @@ func (h *Handler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Re
return
}

if request.ContentLength > maxRequestBodyBytes {
response.WriteHeader(nethttp.StatusRequestEntityTooLarge)
return
}

broker, err := ConvertPathToNamespacedName(request.URL.Path)
ctx = logging.With(ctx, zap.Stringer("broker", broker))
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/broker/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"testing"
"time"

"math/rand"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
Expand Down Expand Up @@ -175,6 +177,13 @@ func TestHandler(t *testing.T) {
event: createTestEvent("test-event"),
wantCode: nethttp.StatusMethodNotAllowed,
},
{
name: "an event with a very large payload",
method: "POST",
path: "/ns1/broker1",
event: createTestEventWithPayloadSize("test-event", 110000000),
wantCode: nethttp.StatusRequestEntityTooLarge,
},
{
name: "malformed path",
path: "/ns1/broker1/and/something/else",
Expand Down Expand Up @@ -466,6 +475,14 @@ func createTestEvent(id string) *cloudevents.Event {
return &event
}

func createTestEventWithPayloadSize(id string, payloadSizeBytes int) *cloudevents.Event {
testEvent := createTestEvent(id)
payload := make([]byte, payloadSizeBytes)
rand.Read(payload)
testEvent.SetData("application/octet-stream", payload)
return testEvent
}

// createRequest creates an http request from the test case. If event is specified, it converts the event to a request.
func createRequest(tc testCase, url string) *nethttp.Request {
method := "POST"
Expand Down