Skip to content

Commit

Permalink
Minor refactor for broker ingress, adding unittests. (#2270)
Browse files Browse the repository at this point in the history
* minor refactor for ttl management for broker ingress. More unit tests.

* broker metrics were conflicting.

* adding ttl defaulter tests.

* adding cr.

* fix race.

* Do not panic if a view is already registered.

* fix imports.
  • Loading branch information
n3wscott authored and knative-prow-robot committed Dec 19, 2019
1 parent 0d3baaa commit 5631d77
Show file tree
Hide file tree
Showing 12 changed files with 348 additions and 57 deletions.
15 changes: 10 additions & 5 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (

cloudevents "github.com/cloudevents/sdk-go"
"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"

"go.opencensus.io/stats/view"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/broker/ingress"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
Expand Down Expand Up @@ -58,9 +59,10 @@ const (
// Purposely set them to be equal, as the ingress only connects to its channel.
// These are magic numbers, partly set based on empirical evidence running performance workloads, and partly
// based on what serving is doing. See https://github.com/knative/serving/blob/master/pkg/network/transports.go.
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 1000
component = "broker_ingress"
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 1000
defaultTTL int32 = 255
component = "broker_ingress"
)

type envConfig struct {
Expand Down Expand Up @@ -146,7 +148,9 @@ func main() {
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
}
ceClient, err := kncloudevents.NewDefaultClientGivenHttpTransport(httpTransport, connectionArgs)
ceClient, err := kncloudevents.NewDefaultClientGivenHttpTransport(
httpTransport,
&connectionArgs)
if err != nil {
logger.Fatal("Unable to create CE client", zap.Error(err))
}
Expand All @@ -160,6 +164,7 @@ func main() {
BrokerName: env.Broker,
Namespace: env.Namespace,
Reporter: reporter,
Defaulter: broker.TTLDefaulter(logger, defaultTTL),
}

// configMapWatcher does not block, so start it first.
Expand Down
2 changes: 1 addition & 1 deletion pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewHandler(logger *zap.Logger, triggerLister eventinglisters.TriggerNamespa
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
}
ceClient, err := kncloudevents.NewDefaultClientGivenHttpTransport(httpTransport, connectionArgs)
ceClient, err := kncloudevents.NewDefaultClientGivenHttpTransport(httpTransport, &connectionArgs)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/broker/filter/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package filter

import (
"context"
"log"
"strconv"
"time"

Expand Down Expand Up @@ -130,7 +131,7 @@ func register() {
},
)
if err != nil {
panic(err)
log.Printf("failed to register opencensus views, %s", err)
}
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/broker/filter/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func setup() {

func resetMetrics() {
// OpenCensus metrics carry global state that need to be reset between unit tests.
metricstest.Unregister("event_count", "event_dispatch_latencies", "event_processing_latencies")
metricstest.Unregister(
"event_count",
"event_dispatch_latencies",
"event_processing_latencies")
register()
}
63 changes: 32 additions & 31 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2019 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ingress

import (
Expand All @@ -8,6 +24,7 @@ import (
"time"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
"go.opencensus.io/trace"
"go.uber.org/zap"
"knative.dev/eventing/pkg/broker"
Expand All @@ -16,8 +33,6 @@ import (

var (
shutdownTimeout = 1 * time.Minute

defaultTTL int32 = 255
)

type Handler struct {
Expand All @@ -27,6 +42,8 @@ type Handler struct {
BrokerName string
Namespace string
Reporter StatsReporter

Defaulter client.EventDefaulter
}

func (h *Handler) Start(ctx context.Context) error {
Expand All @@ -35,7 +52,7 @@ func (h *Handler) Start(ctx context.Context) error {

errCh := make(chan error, 1)
go func() {
errCh <- h.CeClient.StartReceiver(ctx, h.serveHTTP)
errCh <- h.CeClient.StartReceiver(ctx, h.receive)
}()

// Stop either if the receiver stops (sending to errCh) or if stopCh is closed.
Expand All @@ -57,16 +74,16 @@ func (h *Handler) Start(ctx context.Context) error {
}
}

func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
func (h *Handler) receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
// Setting the extension as a string as the CloudEvents sdk does not support non-string extensions.
event.SetExtension(broker.EventArrivalTime, time.Now().Format(time.RFC3339))
event.SetExtension(broker.EventArrivalTime, cloudevents.Timestamp{Time: time.Now()})
tctx := cloudevents.HTTPTransportContextFrom(ctx)
if tctx.Method != http.MethodPost {
resp.Status = http.StatusMethodNotAllowed
return nil
}

// tctx.URI is actually the path...
// tctx.URI is actually the request uri...
if tctx.URI != "/" {
resp.Status = http.StatusNotFound
return nil
Expand All @@ -78,8 +95,15 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
eventType: event.Type(),
}

send := h.decrementTTL(&event)
if !send {
if h.Defaulter != nil {
event = h.Defaulter(ctx, event)
}

if ttl, err := broker.GetTTL(event.Context); err != nil || ttl <= 0 {
h.Logger.Debug("dropping event based on TTL status.",
zap.Int32("TTL", ttl),
zap.String("event.id", event.ID()),
zap.Error(err))
// Record the event count.
h.Reporter.ReportEventCount(reporterArgs, http.StatusBadRequest)
return nil
Expand All @@ -99,26 +123,3 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
h.Reporter.ReportEventCount(reporterArgs, rtctx.StatusCode)
return err
}

func (h *Handler) decrementTTL(event *cloudevents.Event) bool {
ttl := h.getTTLToSet(event)
if ttl <= 0 {
// TODO send to some form of dead letter queue rather than dropping.
h.Logger.Error("Dropping message due to TTL", zap.Any("event", event))
return false
}

if err := broker.SetTTL(event.Context, ttl); err != nil {
h.Logger.Error("Failed to set TTL", zap.Error(err))
}
return true
}

func (h *Handler) getTTLToSet(event *cloudevents.Event) int32 {
ttl, err := broker.GetTTL(event.Context)
if err != nil {
h.Logger.Debug("Error retrieving TTL, defaulting.", zap.Error(err))
return defaultTTL
}
return ttl - 1
}
137 changes: 131 additions & 6 deletions pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
/*
* Copyright 2019 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ingress

import (
"context"
nethttp "net/http"
"net/url"
"reflect"
"sync"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"go.uber.org/zap"
"knative.dev/eventing/pkg/broker"
)

const (
Expand Down Expand Up @@ -37,18 +56,49 @@ func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode in
return nil
}

type fakeClient struct{ sent bool }
type fakeClient struct {
sent bool
fn interface{}
mux sync.Mutex
}

func (f *fakeClient) Send(ctx context.Context, event cloudevents.Event) (context.Context, *cloudevents.Event, error) {
f.sent = true
return ctx, &event, nil
}

func (f *fakeClient) StartReceiver(ctx context.Context, fn interface{}) error {
panic("not implemented")
f.mux.Lock()
f.fn = fn
f.mux.Unlock()
<-ctx.Done()
return nil
}

func (f *fakeClient) ready() bool {
f.mux.Lock()
ready := f.fn != nil
f.mux.Unlock()
return ready
}

func TestIngressHandler_ServeHTTP_FAIL(t *testing.T) {
func (f *fakeClient) fakeReceive(t *testing.T, event cloudevents.Event) {
// receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error

resp := new(cloudevents.EventResponse)
tctx := http.TransportContext{Header: nethttp.Header{}, Method: validHTTPMethod, URI: validURI}
ctx := http.WithTransportContext(context.Background(), tctx)

fnType := reflect.TypeOf(f.fn)
if fnType.Kind() != reflect.Func {
t.Fatal("wrong method type.", fnType.Kind())
}

fn := reflect.ValueOf(f.fn)
_ = fn.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(event), reflect.ValueOf(resp)})
}

func TestIngressHandler_Receive_FAIL(t *testing.T) {
testCases := map[string]struct {
httpmethod string
URI string
Expand Down Expand Up @@ -83,12 +133,13 @@ func TestIngressHandler_ServeHTTP_FAIL(t *testing.T) {
BrokerName: brokerName,
Namespace: namespace,
Reporter: reporter,
Defaulter: broker.TTLDefaulter(zap.NewNop(), 5),
}
event := cloudevents.NewEvent(cloudevents.VersionV1)
resp := new(cloudevents.EventResponse)
tctx := http.TransportContext{Header: nethttp.Header{}, Method: tc.httpmethod, URI: tc.URI}
ctx := http.WithTransportContext(context.Background(), tctx)
_ = handler.serveHTTP(ctx, event, resp)
_ = handler.receive(ctx, event, resp)
if resp.Status != tc.expectedStatus {
t.Errorf("Unexpected status code. Expected %v, Actual %v", tc.expectedStatus, resp.Status)
}
Expand All @@ -102,7 +153,41 @@ func TestIngressHandler_ServeHTTP_FAIL(t *testing.T) {
}
}

func TestIngressHandler_ServeHTTP_Succeed(t *testing.T) {
func TestIngressHandler_Receive_Succeed(t *testing.T) {
client := &fakeClient{}
reporter := &mockReporter{}
handler := Handler{
Logger: zap.NewNop(),
CeClient: client,
ChannelURI: &url.URL{
Scheme: urlScheme,
Host: urlHost,
Path: urlPath,
},
BrokerName: brokerName,
Namespace: namespace,
Reporter: reporter,
Defaulter: broker.TTLDefaulter(zap.NewNop(), 5),
}

event := cloudevents.NewEvent()
resp := new(cloudevents.EventResponse)
tctx := http.TransportContext{Header: nethttp.Header{}, Method: validHTTPMethod, URI: validURI}
ctx := http.WithTransportContext(context.Background(), tctx)
_ = handler.receive(ctx, event, resp)

if !client.sent {
t.Errorf("client should invoke send function")
}
if !reporter.eventCountReported {
t.Errorf("event count should have been reported")
}
if !reporter.eventDispatchTimeReported {
t.Errorf("event dispatch time should have been reported")
}
}

func TestIngressHandler_Receive_NoTTL(t *testing.T) {
client := &fakeClient{}
reporter := &mockReporter{}
handler := Handler{
Expand All @@ -121,7 +206,47 @@ func TestIngressHandler_ServeHTTP_Succeed(t *testing.T) {
resp := new(cloudevents.EventResponse)
tctx := http.TransportContext{Header: nethttp.Header{}, Method: validHTTPMethod, URI: validURI}
ctx := http.WithTransportContext(context.Background(), tctx)
_ = handler.serveHTTP(ctx, event, resp)
_ = handler.receive(ctx, event, resp)
if client.sent {
t.Errorf("client should NOT invoke send function")
}
if !reporter.eventCountReported {
t.Errorf("event count should have been reported")
}
}

func TestIngressHandler_Start(t *testing.T) {
client := &fakeClient{}
reporter := &mockReporter{}
handler := Handler{
Logger: zap.NewNop(),
CeClient: client,
ChannelURI: &url.URL{
Scheme: urlScheme,
Host: urlHost,
Path: urlPath,
},
BrokerName: brokerName,
Namespace: namespace,
Reporter: reporter,
Defaulter: broker.TTLDefaulter(zap.NewNop(), 5),
}

ctx, cancel := context.WithCancel(context.Background())
go func() {
if err := handler.Start(ctx); err != nil {
t.Fatal(err)
}
}()
// Need time for the handler to start up. Wait.
for !client.ready() {
time.Sleep(1 * time.Millisecond)
}

event := cloudevents.NewEvent()
client.fakeReceive(t, event)
cancel()

if !client.sent {
t.Errorf("client should invoke send function")
}
Expand Down
Loading

0 comments on commit 5631d77

Please sign in to comment.