Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Broker tracing conventions (#1064)
Browse files Browse the repository at this point in the history
* Create broker ingress span

* Add broker ingress trace attributes

* Add trigger trace attributes

* Add HTTP Tracecontext propagation to delivery client provider

* Move testing import to ingress handler test

* Create startSpan helper for filter processor

* Record trigger span attributes in startSpan helper
  • Loading branch information
ian-mi authored May 19, 2020
1 parent 7abeb40 commit c3fad97
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cmd/broker/fanout/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/broker/retry/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/broker/handler/pool/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
cepubsub "github.com/cloudevents/sdk-go/v2/protocol/pubsub"
"github.com/google/wire"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
)

var (
Expand All @@ -33,6 +35,10 @@ var (
ceclient.WithTracePropagation(),
}

DefaultHTTPOpts = []cehttp.Option{
cehttp.WithRoundTripper(&ochttp.Transport{Propagation: &tracecontext.HTTPFormat{}}),
}

// ProviderSet provides the fanout and retry sync pools using the default client options. In
// order to inject either pool, ProjectID, []Option, and config.ReadOnlyTargets must be
// externally provided.
Expand All @@ -43,7 +49,7 @@ var (
NewDeliverClient,
NewPubsubClient,
NewRetryClient,
wire.Value([]cehttp.Option(nil)),
wire.Value(DefaultHTTPOpts),
wire.Value(DefaultCEClientOpts),
)
)
Expand Down
31 changes: 26 additions & 5 deletions pkg/broker/handler/processors/filter/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ package filter

import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/extensions"
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/logging"
kntracing "knative.dev/eventing/pkg/tracing"

"github.com/google/knative-gcp/pkg/broker/config"
handlerctx "github.com/google/knative-gcp/pkg/broker/handler/context"
"github.com/google/knative-gcp/pkg/broker/handler/processors"
"github.com/google/knative-gcp/pkg/tracing"
)

// Processor is the processor to filter events based on trigger filters.
Expand All @@ -55,11 +57,12 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error {
return nil
}

if dt, ok := extensions.GetDistributedTracingExtension(*event); ok {
var span *trace.Span
ctx, span = dt.StartChildSpan(ctx, fmt.Sprintf("trigger:%s.%s", target.Name, target.Namespace))
defer span.End()
trigger := types.NamespacedName{
Namespace: target.Namespace,
Name: target.Name,
}
ctx, span := startSpan(ctx, trigger, event)
defer span.End()

if target.FilterAttributes == nil {
return p.Next().Process(ctx, event)
Expand All @@ -72,6 +75,24 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error {
return nil
}

func startSpan(ctx context.Context, trigger types.NamespacedName, event *event.Event) (context.Context, *trace.Span) {
var span *trace.Span
if dt, ok := extensions.GetDistributedTracingExtension(*event); ok {
ctx, span = dt.StartChildSpan(ctx, kntracing.TriggerMessagingDestination(trigger))
} else {
ctx, span = trace.StartSpan(ctx, kntracing.TriggerMessagingDestination(trigger))
}
if span.IsRecordingEvents() {
span.AddAttributes(
kntracing.MessagingSystemAttribute,
tracing.PubSubProtocolAttribute,
kntracing.TriggerMessagingDestinationAttribute(trigger),
kntracing.MessagingMessageIDAttribute(event.ID()),
)
}
return ctx, span
}

func (p *Processor) passFilter(ctx context.Context, attrs map[string]string, event *event.Event) bool {
// Set standard context attributes. The attributes available may not be
// exactly the same as the attributes defined in the current version of the
Expand Down
40 changes: 29 additions & 11 deletions pkg/broker/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@ import (
"strings"
"time"

_ "knative.dev/pkg/metrics/testing"

cev2 "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/transformer"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/tracing"
"github.com/google/wire"
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/logging"
kntracing "knative.dev/eventing/pkg/tracing"
)

const (
Expand Down Expand Up @@ -103,6 +105,7 @@ func (h *Handler) Start(ctx context.Context) error {
// 3. Convert request to event.
// 4. Send event to decouple sink.
func (h *Handler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) {
ctx := request.Context()
h.logger.Debug("Serving http", zap.Any("headers", request.Header))
startTime := time.Now()
if request.Method != nethttp.MethodPost {
Expand All @@ -118,7 +121,11 @@ func (h *Handler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Re
nethttp.Error(response, msg, nethttp.StatusNotFound)
return
}
ns, broker := pieces[1], pieces[2]
broker := types.NamespacedName{
Namespace: pieces[1],
Name: pieces[2],
}

event, err := h.toEvent(request)
if err != nil {
nethttp.Error(response, err.Error(), nethttp.StatusBadRequest)
Expand All @@ -127,15 +134,26 @@ func (h *Handler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Re

event.SetExtension(EventArrivalTime, cev2.Timestamp{Time: time.Now()})

ctx, span := trace.StartSpan(ctx, kntracing.BrokerMessagingDestination(broker))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
kntracing.MessagingSystemAttribute,
tracing.PubSubProtocolAttribute,
kntracing.BrokerMessagingDestinationAttribute(broker),
kntracing.MessagingMessageIDAttribute(event.ID()),
)
}

// Optimistically set status code to StatusAccepted. It will be updated if there is an error.
// According to the data plane spec (https://github.com/knative/eventing/blob/master/docs/spec/data-plane.md), a
// non-callable SINK (which broker is) MUST respond with 202 Accepted if the request is accepted.
statusCode := nethttp.StatusAccepted
ctx, cancel := context.WithTimeout(request.Context(), decoupleSinkTimeout)
ctx, cancel := context.WithTimeout(ctx, decoupleSinkTimeout)
defer cancel()
defer func() { h.reportMetrics(request.Context(), ns, broker, event, statusCode, startTime) }()
if res := h.decouple.Send(ctx, ns, broker, *event); !cev2.IsACK(res) {
msg := fmt.Sprintf("Error publishing to PubSub for broker %v/%v. event: %+v, err: %v.", ns, broker, event, res)
defer func() { h.reportMetrics(request.Context(), broker, event, statusCode, startTime) }()
if res := h.decouple.Send(ctx, broker.Namespace, broker.Name, *event); !cev2.IsACK(res) {
msg := fmt.Sprintf("Error publishing to PubSub for broker %s. event: %+v, err: %v.", broker, event, res)
h.logger.Error(msg)
statusCode = nethttp.StatusInternalServerError
if errors.Is(res, ErrNotFound) {
Expand Down Expand Up @@ -171,14 +189,14 @@ func (h *Handler) toEvent(request *nethttp.Request) (*cev2.Event, error) {
return event, nil
}

func (h *Handler) reportMetrics(ctx context.Context, ns, broker string, event *cev2.Event, statusCode int, start time.Time) {
func (h *Handler) reportMetrics(ctx context.Context, broker types.NamespacedName, event *cev2.Event, statusCode int, start time.Time) {
args := metrics.IngressReportArgs{
Namespace: ns,
Broker: broker,
Namespace: broker.Namespace,
Broker: broker.Name,
EventType: event.Type(),
ResponseCode: statusCode,
}
if err := h.reporter.ReportEventDispatchTime(ctx, args, time.Since(start)); err != nil {
h.logger.Warn("Failed to record metrics.", zap.Any("namespace", ns), zap.Any("broker", broker), zap.Error(err))
h.logger.Warn("Failed to record metrics.", zap.Any("namespace", broker.Namespace), zap.Any("broker", broker.Name), zap.Error(err))
}
}
2 changes: 2 additions & 0 deletions pkg/broker/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
logtest "knative.dev/pkg/logging/testing"
"knative.dev/pkg/metrics/metricskey"
"knative.dev/pkg/metrics/metricstest"

_ "knative.dev/pkg/metrics/testing"
)

const (
Expand Down
27 changes: 27 additions & 0 deletions pkg/tracing/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2020 Google LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import "knative.dev/eventing/pkg/tracing"

const (
PubSubProtocol = "Pub/Sub"
)

var (
PubSubProtocolAttribute = tracing.MessagingProtocolAttribute(PubSubProtocol)
)

0 comments on commit c3fad97

Please sign in to comment.