Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[0.18] Backport #4466 #4471

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions pkg/mtbroker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,20 +213,11 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
}

// If there is an event in the response write it to the response
statusCode, err := writeResponse(ctx, writer, response, ttl)
statusCode, err := h.writeResponse(ctx, writer, response, ttl, target)
if err != nil {
h.logger.Error("failed to write response", zap.Error(err))
// Ok, so writeResponse will return the HttpStatus of the function. That may have
// succeeded (200), but it may have returned a malformed event, so if the
// function succeeded, convert this to an StatusBadGateway instead to indicate
// error. Note that we could just use StatusInternalServerError, but to distinguish
// between the two failure cases, we use a different code here.
if statusCode == 200 {
statusCode = http.StatusBadGateway
}
}
_ = h.reporter.ReportEventCount(reportArgs, statusCode)
writer.WriteHeader(statusCode)
}

func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target string, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, error) {
Expand Down Expand Up @@ -262,7 +253,8 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target str
return resp, err
}

func writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32) (int, error) {
// The return values are the status
func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32, target string) (int, error) {
response := cehttp.NewMessageFromHttpResponse(resp)
defer response.Finish(nil)

Expand All @@ -275,19 +267,28 @@ func writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.R
n, _ := response.BodyReader.Read(body)
response.BodyReader.Close()
if n != 0 {
return resp.StatusCode, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be or empty or a valid CloudEvent")
// Note that we could just use StatusInternalServerError, but to distinguish
// between the failure cases, we use a different code here.
writer.WriteHeader(http.StatusBadGateway)
return http.StatusBadGateway, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be or empty or a valid CloudEvent")
}
h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target))
writer.WriteHeader(resp.StatusCode)
return resp.StatusCode, nil
}

event, err := binding.ToEvent(ctx, response)
if err != nil {
// Like in the above case, we could just use StatusInternalServerError, but to distinguish
// between the failure cases, we use a different code here.
writer.WriteHeader(http.StatusBadGateway)
// Malformed event, reply with err
return resp.StatusCode, err
return http.StatusBadGateway, err
}

// Reattach the TTL (with the same value) to the response event before sending it to the Broker.
if err := broker.SetTTL(event.Context, ttl); err != nil {
writer.WriteHeader(http.StatusInternalServerError)
return http.StatusInternalServerError, fmt.Errorf("failed to reset TTL: %w", err)
}

Expand Down
95 changes: 81 additions & 14 deletions pkg/mtbroker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import (
"github.com/cloudevents/sdk-go/v2/event"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/go-cmp/cmp"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"knative.dev/pkg/apis"

eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
broker "knative.dev/eventing/pkg/mtbroker"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/apis"
)

const (
Expand Down Expand Up @@ -277,16 +279,36 @@ func TestReceiver(t *testing.T) {
expectedEventDispatchTime: true,
returnedEvent: makeDifferentEvent(),
},
"Returned malformed Cloud Event": {
"Returned non empty non event response": {
triggers: []*eventingv1beta1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
},
expectedDispatch: true,
expectedEventCount: true,
expectedEventDispatchTime: true,
expectedStatus: http.StatusBadGateway,
response: makeNonEmptyResponse(),
},
"Returned malformed Cloud Event": {
triggers: []*eventingv1beta1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
},
expectedDispatch: true,
expectedEventCount: true,
expectedEventDispatchTime: true,
expectedStatus: http.StatusOK,
response: makeMalformedEventResponse(),
},
"Returned malformed structured Cloud Event": {
triggers: []*eventingv1beta1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
},
expectedDispatch: true,
expectedEventCount: true,
expectedEventDispatchTime: true,
expectedStatus: http.StatusBadGateway,
response: makeMalformedStructuredEventResponse(),
},
"Returned empty body 200": {
triggers: []*eventingv1beta1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
Expand Down Expand Up @@ -364,7 +386,11 @@ func TestReceiver(t *testing.T) {
tc.request.Header.Set(cehttp.ContentType, event.ApplicationCloudEventsJSON)
}
responseWriter := httptest.NewRecorder()
r.ServeHTTP(responseWriter, tc.request)
r.ServeHTTP(&responseWriterWithInvocationsCheck{
ResponseWriter: responseWriter,
headersWritten: atomic.NewBool(false),
t: t,
}, tc.request)

response := responseWriter.Result()

Expand Down Expand Up @@ -425,6 +451,19 @@ func TestReceiver(t *testing.T) {
}
}

type responseWriterWithInvocationsCheck struct {
http.ResponseWriter
headersWritten *atomic.Bool
t *testing.T
}

func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) {
if !r.headersWritten.CAS(false, true) {
r.t.Fatal("WriteHeader invoked more than once")
}
r.ResponseWriter.WriteHeader(statusCode)
}

type mockReporter struct {
eventCountReported bool
eventDispatchTimeReported bool
Expand Down Expand Up @@ -493,15 +532,18 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
}
}
if h.response != nil {
defer h.response.Body.Close()
body, err := ioutil.ReadAll(h.response.Body)
if err != nil {
h.t.Fatalf("Unable to read body: %v", err)
for k, v := range h.response.Header {
resp.Header().Set(k, v[0])
}
resp.WriteHeader(h.response.StatusCode)
resp.Header().Set("Content-Type", "garbage")
resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(body)))
resp.Write(body)
if h.response.Body != nil {
defer h.response.Body.Close()
body, err := ioutil.ReadAll(h.response.Body)
if err != nil {
h.t.Fatal("Unable to read body: ", err)
}
resp.Write(body)
}
}
}

Expand Down Expand Up @@ -589,7 +631,7 @@ func makeEventWithExtension(extName, extValue string) *cloudevents.Event {
return &e
}

func makeMalformedEventResponse() *http.Response {
func makeNonEmptyResponse() *http.Response {
r := &http.Response{
Status: "200 OK",
StatusCode: 200,
Expand All @@ -604,6 +646,34 @@ func makeMalformedEventResponse() *http.Response {
return r
}

func makeMalformedEventResponse() *http.Response {
r := &http.Response{
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
}
r.Header.Set("Ce-Specversion", "9000.1")
return r
}

func makeMalformedStructuredEventResponse() *http.Response {
r := &http.Response{
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: ioutil.NopCloser(bytes.NewReader([]byte("{}"))),
Header: make(http.Header),
}
r.Header.Set("Content-Type", cloudevents.ApplicationCloudEventsJSON)

return r
}

func makeEmptyResponse(status int) *http.Response {
s := fmt.Sprintf("%d OK", status)
r := &http.Response{
Expand All @@ -612,10 +682,7 @@ func makeEmptyResponse(status int) *http.Response {
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: ioutil.NopCloser(bytes.NewBufferString("")),
Header: make(http.Header),
}
r.Header.Set("Content-Type", "garbage")
r.Header.Set("Content-Length", "0")
return r
}