Skip to content

Commit

Permalink
Remove double invocations to responseWriter.WriteHeader in filter han…
Browse files Browse the repository at this point in the history
…dler (knative#4466)

* Fix knative#4464

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Docs

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Moar tests

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Linting

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Nit with metrics

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

(cherry picked from commit a6fc540)
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed Nov 5, 2020
1 parent 650096a commit c61ccfb
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 26 deletions.
25 changes: 13 additions & 12 deletions pkg/mtbroker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,8 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
statusCode, err := writeResponse(ctx, writer, response, ttl)
if err != nil {
h.logger.Error("failed to write response", zap.Error(err))
// Ok, so writeResponse will return the HttpStatus of the function. That may have
// succeeded (200), but it may have returned a malformed event, so if the
// function succeeded, convert this to an StatusBadGateway instead to indicate
// error. Note that we could just use StatusInternalServerError, but to distinguish
// between the two failure cases, we use a different code here.
if statusCode == 200 {
statusCode = http.StatusBadGateway
}
}
_ = h.reporter.ReportEventCount(reportArgs, statusCode)
writer.WriteHeader(statusCode)
}

func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target string, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, error) {
Expand Down Expand Up @@ -262,7 +253,8 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target str
return resp, err
}

func writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32) (int, error) {
// The return values are the status
func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32, target string) (int, error) {
response := cehttp.NewMessageFromHttpResponse(resp)
defer response.Finish(nil)

Expand All @@ -275,19 +267,28 @@ func writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.R
n, _ := response.BodyReader.Read(body)
response.BodyReader.Close()
if n != 0 {
return resp.StatusCode, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be or empty or a valid CloudEvent")
// Note that we could just use StatusInternalServerError, but to distinguish
// between the failure cases, we use a different code here.
writer.WriteHeader(http.StatusBadGateway)
return http.StatusBadGateway, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be or empty or a valid CloudEvent")
}
h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target))
writer.WriteHeader(resp.StatusCode)
return resp.StatusCode, nil
}

event, err := binding.ToEvent(ctx, response)
if err != nil {
// Like in the above case, we could just use StatusInternalServerError, but to distinguish
// between the failure cases, we use a different code here.
writer.WriteHeader(http.StatusBadGateway)
// Malformed event, reply with err
return resp.StatusCode, err
return http.StatusBadGateway, err
}

// Reattach the TTL (with the same value) to the response event before sending it to the Broker.
if err := broker.SetTTL(event.Context, ttl); err != nil {
writer.WriteHeader(http.StatusInternalServerError)
return http.StatusInternalServerError, fmt.Errorf("failed to reset TTL: %w", err)
}

Expand Down
95 changes: 81 additions & 14 deletions pkg/mtbroker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import (
"github.com/cloudevents/sdk-go/v2/event"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/go-cmp/cmp"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"knative.dev/pkg/apis"

eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
broker "knative.dev/eventing/pkg/mtbroker"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/apis"
)

const (
Expand Down Expand Up @@ -277,16 +279,36 @@ func TestReceiver(t *testing.T) {
expectedEventDispatchTime: true,
returnedEvent: makeDifferentEvent(),
},
"Returned malformed Cloud Event": {
"Returned non empty non event response": {
triggers: []*eventingv1beta1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
},
expectedDispatch: true,
expectedEventCount: true,
expectedEventDispatchTime: true,
expectedStatus: http.StatusBadGateway,
response: makeNonEmptyResponse(),
},
"Returned malformed Cloud Event": {
triggers: []*eventingv1beta1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
},
expectedDispatch: true,
expectedEventCount: true,
expectedEventDispatchTime: true,
expectedStatus: http.StatusOK,
response: makeMalformedEventResponse(),
},
"Returned malformed structured Cloud Event": {
triggers: []*eventingv1beta1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
},
expectedDispatch: true,
expectedEventCount: true,
expectedEventDispatchTime: true,
expectedStatus: http.StatusBadGateway,
response: makeMalformedStructuredEventResponse(),
},
"Returned empty body 200": {
triggers: []*eventingv1beta1.Trigger{
makeTrigger(makeTriggerFilterWithAttributes("", "")),
Expand Down Expand Up @@ -364,7 +386,11 @@ func TestReceiver(t *testing.T) {
tc.request.Header.Set(cehttp.ContentType, event.ApplicationCloudEventsJSON)
}
responseWriter := httptest.NewRecorder()
r.ServeHTTP(responseWriter, tc.request)
r.ServeHTTP(&responseWriterWithInvocationsCheck{
ResponseWriter: responseWriter,
headersWritten: atomic.NewBool(false),
t: t,
}, tc.request)

response := responseWriter.Result()

Expand Down Expand Up @@ -425,6 +451,19 @@ func TestReceiver(t *testing.T) {
}
}

type responseWriterWithInvocationsCheck struct {
http.ResponseWriter
headersWritten *atomic.Bool
t *testing.T
}

func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) {
if !r.headersWritten.CAS(false, true) {
r.t.Fatal("WriteHeader invoked more than once")
}
r.ResponseWriter.WriteHeader(statusCode)
}

type mockReporter struct {
eventCountReported bool
eventDispatchTimeReported bool
Expand Down Expand Up @@ -493,15 +532,18 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
}
}
if h.response != nil {
defer h.response.Body.Close()
body, err := ioutil.ReadAll(h.response.Body)
if err != nil {
h.t.Fatalf("Unable to read body: %v", err)
for k, v := range h.response.Header {
resp.Header().Set(k, v[0])
}
resp.WriteHeader(h.response.StatusCode)
resp.Header().Set("Content-Type", "garbage")
resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(body)))
resp.Write(body)
if h.response.Body != nil {
defer h.response.Body.Close()
body, err := ioutil.ReadAll(h.response.Body)
if err != nil {
h.t.Fatal("Unable to read body: ", err)
}
resp.Write(body)
}
}
}

Expand Down Expand Up @@ -589,7 +631,7 @@ func makeEventWithExtension(extName, extValue string) *cloudevents.Event {
return &e
}

func makeMalformedEventResponse() *http.Response {
func makeNonEmptyResponse() *http.Response {
r := &http.Response{
Status: "200 OK",
StatusCode: 200,
Expand All @@ -604,6 +646,34 @@ func makeMalformedEventResponse() *http.Response {
return r
}

func makeMalformedEventResponse() *http.Response {
r := &http.Response{
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
}
r.Header.Set("Ce-Specversion", "9000.1")
return r
}

func makeMalformedStructuredEventResponse() *http.Response {
r := &http.Response{
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: ioutil.NopCloser(bytes.NewReader([]byte("{}"))),
Header: make(http.Header),
}
r.Header.Set("Content-Type", cloudevents.ApplicationCloudEventsJSON)

return r
}

func makeEmptyResponse(status int) *http.Response {
s := fmt.Sprintf("%d OK", status)
r := &http.Response{
Expand All @@ -612,10 +682,7 @@ func makeEmptyResponse(status int) *http.Response {
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: ioutil.NopCloser(bytes.NewBufferString("")),
Header: make(http.Header),
}
r.Header.Set("Content-Type", "garbage")
r.Header.Set("Content-Length", "0")
return r
}

0 comments on commit c61ccfb

Please sign in to comment.