Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Add timeout delivery metrics #1578

Merged
merged 3 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/broker/handler/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
}

expectMetrics.ExpectProcessing(t, t1.Name)
expectMetrics.ExpectTimeout(t, t1.Name)
expectMetrics.Expect200(t, t2.Name)
expectMetrics.Verify(t)
})
Expand Down
16 changes: 15 additions & 1 deletion pkg/broker/handler/processors/deliver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -149,15 +150,28 @@ func (p *Processor) deliver(ctx context.Context, target *config.Target, broker *
// Remove hops from forwarded event.
resp, err := p.sendMsg(ctx, target.Address, msg, transformer.DeleteExtension(eventutil.HopsAttribute))
if err != nil {
var result *url.Error
if errors.As(err, &result) && result.Timeout() {
// If the delivery is cancelled because of timeout, report event dispatch time without resp status code.
p.StatsReporter.ReportEventDispatchTime(ctx, time.Since(startTime))
}
return err
}

defer func() {
if err := resp.Body.Close(); err != nil {
logging.FromContext(ctx).Warn("failed to close response body", zap.Error(err))
}
}()

p.StatsReporter.ReportEventDispatchTime(ctx, time.Since(startTime), resp.StatusCode)
// Insert status code tag into context.
cctx, err := metrics.AddRespStatusCodeTags(ctx, resp.StatusCode)
if err != nil {
logging.FromContext(ctx).Error("failed to add status code tags to context", zap.Error(err))
}
// Report event dispatch time with resp status code.
p.StatsReporter.ReportEventDispatchTime(cctx, time.Since(startTime))

if resp.StatusCode/100 != 2 {
return fmt.Errorf("event delivery failed: HTTP status code %d", resp.StatusCode)
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/metrics/delivery_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"strconv"
"time"

"github.com/google/knative-gcp/pkg/broker/config"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"knative.dev/pkg/metrics"

"github.com/google/knative-gcp/pkg/broker/config"
)

type DeliveryMetricsKey int
Expand Down Expand Up @@ -121,14 +122,9 @@ func NewDeliveryReporter(podName PodName, containerName ContainerName) (*Deliver
}

// ReportEventDispatchTime captures dispatch times.
func (r *DeliveryReporter) ReportEventDispatchTime(ctx context.Context, d time.Duration, responseCode int) {
func (r *DeliveryReporter) ReportEventDispatchTime(ctx context.Context, d time.Duration) {
// convert time.Duration in nanoseconds to milliseconds.
metrics.Record(ctx, r.dispatchTimeInMsecM.M(float64(d/time.Millisecond)),
stats.WithTags(
tag.Insert(ResponseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(ResponseCodeClassKey, metrics.ResponseCodeClass(responseCode)),
),
)
metrics.Record(ctx, r.dispatchTimeInMsecM.M(float64(d/time.Millisecond)))
}

// StartEventProcessing records the start of event processing for delivery within the given context.
Expand Down Expand Up @@ -169,6 +165,13 @@ func (r *DeliveryReporter) AddTags(ctx context.Context) (context.Context, error)
)
}

func AddRespStatusCodeTags(ctx context.Context, responseCode int) (context.Context, error) {
return tag.New(ctx,
tag.Insert(ResponseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(ResponseCodeClassKey, metrics.ResponseCodeClass(responseCode)),
)
}

func AddTargetTags(ctx context.Context, target *config.Target) (context.Context, error) {
return tag.New(ctx,
tag.Insert(NamespaceNameKey, target.Namespace),
Expand Down
20 changes: 15 additions & 5 deletions pkg/metrics/delivery_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ func TestReportEventDispatchTime(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cctx, _ := AddRespStatusCodeTags(ctx, 202)
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 1100*time.Millisecond, 202)
r.ReportEventDispatchTime(cctx, 1100*time.Millisecond)
return nil
})
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 9100*time.Millisecond, 202)
r.ReportEventDispatchTime(cctx, 9100*time.Millisecond)
return nil
})
metricstest.CheckCountData(t, "event_count", wantTags, 2)
Expand Down Expand Up @@ -120,8 +121,17 @@ func TestReportEventProcessingTime(t *testing.T) {
reportertest.ExpectMetrics(t, func() error {
return r.reportEventProcessingTime(ctx, startTime.Add(9100*time.Millisecond))
})

// Test report event dispatch time without status code.
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 1100*time.Millisecond)
return nil
})
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 9100*time.Millisecond)
return nil
})
metricstest.CheckDistributionData(t, "event_processing_latencies", wantTags, 2, 1100.0, 9100.0)
metricstest.CheckDistributionData(t, "event_dispatch_latencies", wantTags, 2, 1100.0, 9100.0)
}

func TestMetricsWithEmptySourceAndTypeFilter(t *testing.T) {
Expand Down Expand Up @@ -154,9 +164,9 @@ func TestMetricsWithEmptySourceAndTypeFilter(t *testing.T) {
if err != nil {
t.Fatal(err)
}

cctx, _ := AddRespStatusCodeTags(ctx, 202)
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 1100*time.Millisecond, 202)
r.ReportEventDispatchTime(cctx, 1100*time.Millisecond)
return nil
})
metricstest.CheckCountData(t, "event_count", wantTags, 1)
Expand Down
51 changes: 50 additions & 1 deletion pkg/metrics/testing/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ type ExpectDelivery struct {
TriggerTags map[string]Tags
ProcessingCount map[string]int64
DeliveryCount map[deliveryKey]int64
TimeoutCount map[string]int64
}

func NewExpectDelivery() ExpectDelivery {
return ExpectDelivery{
TriggerTags: make(map[string]Tags),
ProcessingCount: make(map[string]int64),
DeliveryCount: make(map[deliveryKey]int64),
TimeoutCount: make(map[string]int64),
}
}

Expand Down Expand Up @@ -76,6 +78,13 @@ func (e ExpectDelivery) ExpectDelivery(t *testing.T, trigger string, code int) {
e.DeliveryCount[key] = e.DeliveryCount[key] + 1
}

func (e ExpectDelivery) ExpectTimeout(t *testing.T, trigger string) {
if _, ok := e.TriggerTags[trigger]; !ok {
t.Fatalf("trigger %q not defined", trigger)
}
e.TimeoutCount[trigger] = e.TimeoutCount[trigger] + 1
}

func (e ExpectDelivery) Expect200(t *testing.T, trigger string) {
e.ExpectProcessing(t, trigger)
e.ExpectDelivery(t, trigger, 200)
Expand Down Expand Up @@ -109,6 +118,9 @@ func (e ExpectDelivery) attemptVerify() error {
if err := e.verifyDelivery("event_dispatch_latencies"); err != nil {
return err
}
if err := e.verifyTimeout(); err != nil {
return err
}
return nil
}

Expand All @@ -127,7 +139,10 @@ func (e ExpectDelivery) verifyDelivery(viewName string) error {
if !ok {
return fmt.Errorf("missing trigger_name tag for row: %v", row)
}
if code, err := strconv.Atoi(tags["response_code"]); err != nil {
if tags["response_code"] == "" {
// Skip time out record which doesn't have response code.
continue
} else if code, err := strconv.Atoi(tags["response_code"]); err != nil {
return fmt.Errorf("invalid response code in tags: %v", tags)
} else {
if got[deliveryKey{Trigger: trigger, Code: code}], err = getCount(row); err != nil {
Expand Down Expand Up @@ -178,6 +193,40 @@ func (e ExpectDelivery) verifyProcessing() error {
return nil
}

func (e ExpectDelivery) verifyTimeout() error {
rows, err := view.RetrieveData("event_dispatch_latencies")
if err != nil {
return err
}
got := make(map[string]int64)
for _, row := range rows {
tags := make(map[string]string)
for _, t := range row.Tags {
tags[t.Key.Name()] = t.Value
}
trigger, ok := tags["trigger_name"]
if !ok {
return fmt.Errorf("missing trigger_name tag for row: %v", row)
}

if tags["response_code"] != "" {
continue
}

if diff := cmp.Diff(e.TriggerTags[trigger], Tags(tags)); diff != "" {
return fmt.Errorf("unexpected tags (-want, +got) = %v", diff)
}
if got[trigger], err = getCount(row); err != nil {
return err
}
}

if diff := cmp.Diff(e.TimeoutCount, got); diff != "" {
return fmt.Errorf("unexpected timeout event_dispatch_latencies measurement count (-want, +got) = %v", diff)
}
return nil
}

func getCount(row *view.Row) (int64, error) {
switch data := row.Data.(type) {
case *view.CountData:
Expand Down