Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventing TLS: Add scheme label to metrics #7581

Merged
merged 19 commits into from
Jan 22, 2024
22 changes: 16 additions & 6 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,19 @@ func NewClient(cfg ClientConfig) (Client, error) {
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
oidcTokenProvider: cfg.TokenProvider,
scheme: "http",
}

if cfg.Env != nil {
client.audience = cfg.Env.GetAudience()
client.oidcServiceAccountName = cfg.Env.GetOIDCServiceAccountName()
sinkURI := cfg.Env.GetSink()
if sinkURI != "" {
parsedUrl, err := url.Parse(sinkURI)
if err == nil {
client.scheme = parsedUrl.Scheme
}
}
}

return client, nil
Expand All @@ -234,12 +242,12 @@ func setTimeOut(duration time.Duration) http.Option {
}

type client struct {
ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler

ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler
scheme string
oidcTokenProvider *auth.OIDCTokenProvider
audience *string
oidcServiceAccountName *types.NamespacedName
Expand Down Expand Up @@ -302,13 +310,15 @@ func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, res
if c.reporter == nil {
return
}

tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: tags.Name,
ResourceGroup: tags.ResourceGroup,
EventScheme: c.scheme,
}

var rres *http.RetriesResult
Expand Down
19 changes: 18 additions & 1 deletion pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
requestType: "reply_forward",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

h.logger.Info("sending to reply", zap.Any("target", target))

// since the broker-filter acts here like a proxy, we don't filter headers
Expand Down Expand Up @@ -277,6 +283,12 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
requestType: "dls_forward",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

h.logger.Info("sending to dls", zap.Any("target", target))

// since the broker-filter acts here like a proxy, we don't filter headers
Expand Down Expand Up @@ -312,6 +324,12 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
requestType: "filter",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

subscriberURI := trigger.Status.SubscriberURI
if subscriberURI == nil {
// Record the event count.
Expand Down Expand Up @@ -342,7 +360,6 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {

additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

Expand Down
27 changes: 15 additions & 12 deletions pkg/broker/filter/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ var (
// go.opencensus.io/tag/validate.go. Currently those restrictions are:
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)

type ReportArgs struct {
ns string
trigger string
broker string
filterType string
requestType string
ns string
trigger string
broker string
filterType string
requestType string
requestScheme string
}

func init() {
Expand Down Expand Up @@ -116,19 +118,19 @@ func register() {
Description: eventCountM.Description(),
Measure: eventCountM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: processingTimeInMsecM.Description(),
Measure: processingTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
)
if err != nil {
Expand Down Expand Up @@ -190,6 +192,7 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)),
tag.Insert(triggerFilterRequestTypeKey, args.requestType),
tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
)...)
return ctx, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/broker/filter/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
setup()

args := &ReportArgs{
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
requestScheme: "http",
}

r := NewStatsReporter("testcontainer", "testpod")
Expand All @@ -110,6 +111,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
metrics.LabelResponseCodeClass: "2xx",
broker.LabelContainerName: "testcontainer",
broker.LabelUniqueName: "testpod",
metrics.LabelEventScheme: "http",
}

resource := resource.Resource{
Expand Down
6 changes: 6 additions & 0 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
eventType: event.Type(),
}

if request.TLS != nil {
reporterArgs.eventScheme = "https"
} else {
reporterArgs.eventScheme = "http"
}

statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, broker)
if dispatchTime > kncloudevents.NoDuration {
_ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime)
Expand Down
19 changes: 13 additions & 6 deletions pkg/broker/ingress/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ var (
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
eventTypeKey = tag.MustNewKey(eventingmetrics.LabelEventType)
eventSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)

type ReportArgs struct {
ns string
broker string
eventType string
ns string
broker string
eventType string
eventScheme string
}

func init() {
Expand All @@ -75,8 +77,10 @@ type StatsReporter interface {
ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error
}

var _ StatsReporter = (*reporter)(nil)
var emptyContext = context.Background()
var (
_ StatsReporter = (*reporter)(nil)
emptyContext = context.Background()
)

// Reporter holds cached metric objects to report ingress metrics.
type reporter struct {
Expand All @@ -95,10 +99,12 @@ func NewStatsReporter(container, uniqueName string) StatsReporter {
func register() {
tagKeys := []tag.Key{
eventTypeKey,
eventSchemeKey,
responseCodeKey,
responseCodeClassKey,
broker.ContainerTagKey,
broker.UniqueTagKey}
broker.UniqueTagKey,
}

// Create view to see our measurements.
err := metrics.RegisterResourceView(
Expand Down Expand Up @@ -154,6 +160,7 @@ func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Cont
tag.Insert(broker.ContainerTagKey, r.container),
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(eventTypeKey, args.eventType),
tag.Insert(eventSchemeKey, args.eventScheme),
tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)))
}
8 changes: 5 additions & 3 deletions pkg/broker/ingress/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func TestStatsReporter(t *testing.T) {
setup()

args := &ReportArgs{
ns: "testns",
broker: "testbroker",
eventType: "testeventtype",
ns: "testns",
broker: "testbroker",
eventType: "testeventtype",
eventScheme: "http",
}

r := NewStatsReporter("testcontainer", "testpod")
Expand All @@ -45,6 +46,7 @@ func TestStatsReporter(t *testing.T) {
metrics.LabelResponseCodeClass: "2xx",
broker.LabelUniqueName: "testpod",
broker.LabelContainerName: "testcontainer",
metrics.LabelEventScheme: "http",
}

resource := resource.Resource{
Expand Down
6 changes: 6 additions & 0 deletions pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth

args.Ns = channel.Namespace

if request.TLS != nil {
args.EventScheme = "https"
} else {
args.EventScheme = "http"
}

event, err := http.NewEventFromHTTPRequest(request)
if err != nil {
r.logger.Warn("failed to extract event from request", zap.Error(err))
Expand Down
Loading
Loading