Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed Nov 5, 2020
1 parent 7bf5d1c commit cdb060b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
20 changes: 11 additions & 9 deletions pkg/mtbroker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,8 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
statusCode, err := h.writeResponse(ctx, writer, response, ttl, target)
if err != nil {
h.logger.Error("failed to write response", zap.Error(err))
// Ok, so writeResponse will return the HTTPStatus of the function. That may have
// succeeded (200), but it may have returned a malformed event, so if the
// function succeeded, convert this to an StatusBadGateway instead to indicate
// error. Note that we could just use StatusInternalServerError, but to distinguish
// between the two failure cases, we use a different code here.
if statusCode == http.StatusOK {
statusCode = http.StatusBadGateway
}
}
_ = h.reporter.ReportEventCount(reportArgs, statusCode)
writer.WriteHeader(statusCode)
}

func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target string, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, error) {
Expand Down Expand Up @@ -265,6 +256,7 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target str
return resp, err
}

// The return values are the status
func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32, target string) (int, error) {
response := cehttp.NewMessageFromHttpResponse(resp)
defer response.Finish(nil)
Expand All @@ -278,20 +270,30 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter,
n, _ := response.BodyReader.Read(body)
response.BodyReader.Close()
if n != 0 {
// Note that we could just use StatusInternalServerError, but to distinguish
// between the two failure cases, we use a different code here.
writer.WriteHeader(http.StatusBadGateway)
return resp.StatusCode, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be or empty or a valid CloudEvent")
}
h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target))
writer.WriteHeader(resp.StatusCode)
return resp.StatusCode, nil
}

event, err := binding.ToEvent(ctx, response)
if err != nil {
// Note that we could just use StatusInternalServerError, but to distinguish
// between the two failure cases, we use a different code here.
writer.WriteHeader(http.StatusBadGateway)
// Malformed event, reply with err
return resp.StatusCode, err
}

// Reattach the TTL (with the same value) to the response event before sending it to the Broker.
if err := broker.SetTTL(event.Context, ttl); err != nil {
// Note that we could just use StatusInternalServerError, but to distinguish
// between the two failure cases, we use a different code here.
writer.WriteHeader(http.StatusInternalServerError)
return http.StatusInternalServerError, fmt.Errorf("failed to reset TTL: %w", err)
}

Expand Down
23 changes: 21 additions & 2 deletions pkg/mtbroker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import (
"github.com/cloudevents/sdk-go/v2/event"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/go-cmp/cmp"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"knative.dev/pkg/apis"

eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
broker "knative.dev/eventing/pkg/mtbroker"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/apis"
)

const (
Expand Down Expand Up @@ -364,7 +366,11 @@ func TestReceiver(t *testing.T) {
tc.request.Header.Set(cehttp.ContentType, event.ApplicationCloudEventsJSON)
}
responseWriter := httptest.NewRecorder()
r.ServeHTTP(responseWriter, tc.request)
r.ServeHTTP(&responseWriterWithInvocationsCheck{
ResponseWriter: responseWriter,
headersWritten: atomic.NewBool(false),
t: t,
}, tc.request)

response := responseWriter.Result()

Expand Down Expand Up @@ -425,6 +431,19 @@ func TestReceiver(t *testing.T) {
}
}

type responseWriterWithInvocationsCheck struct {
http.ResponseWriter
headersWritten *atomic.Bool
t *testing.T
}

func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) {
if !r.headersWritten.CAS(false, true) {
r.t.Fatal("WriteHeader invoked more than once")
}
r.ResponseWriter.WriteHeader(statusCode)
}

type mockReporter struct {
eventCountReported bool
eventDispatchTimeReported bool
Expand Down

0 comments on commit cdb060b

Please sign in to comment.