Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventing TLS: Add scheme label to metrics #7581

Merged
merged 19 commits into from
Jan 22, 2024
23 changes: 17 additions & 6 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func NewClient(cfg ClientConfig) (Client, error) {
if cfg.Env != nil {
client.audience = cfg.Env.GetAudience()
client.oidcServiceAccountName = cfg.Env.GetOIDCServiceAccountName()
client.sinkURI = cfg.Env.GetSink()
}

return client, nil
Expand All @@ -234,12 +235,12 @@ func setTimeOut(duration time.Duration) http.Option {
}

type client struct {
ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler

ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler
sinkURI string
oidcTokenProvider *auth.OIDCTokenProvider
audience *string
oidcServiceAccountName *types.NamespacedName
Expand Down Expand Up @@ -302,13 +303,23 @@ func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, res
if c.reporter == nil {
return
}
scheme := "http" // Default value

if c.sinkURI != "" {
parsedUrl, err := url.Parse(c.sinkURI)
if err == nil {
scheme = parsedUrl.Scheme
}
}
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved

tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: tags.Name,
ResourceGroup: tags.ResourceGroup,
EventScheme: scheme,
}

var rres *http.RetriesResult
Expand Down
54 changes: 40 additions & 14 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,19 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
target := broker.Status.Address

reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
requestType: "reply_forward",
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
requestType: "reply_forward",
requestScheme: request.URL.Scheme,
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}

if reportArgs.requestScheme == "" {
if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}

h.logger.Info("sending to reply", zap.Any("target", target))
Expand Down Expand Up @@ -271,10 +280,19 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
}

reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
requestType: "dls_forward",
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
requestType: "dls_forward",
requestScheme: request.URL.Scheme,
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}

if reportArgs.requestScheme == "" {
if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}
}

h.logger.Info("sending to dls", zap.Any("target", target))
Expand Down Expand Up @@ -305,11 +323,20 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
}

reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"),
requestType: "filter",
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"),
requestType: "filter",
requestScheme: request.URL.Scheme,
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}

if reportArgs.requestScheme == "" {
if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}
}

subscriberURI := trigger.Status.SubscriberURI
Expand Down Expand Up @@ -342,7 +369,6 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {

additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

Expand Down
27 changes: 15 additions & 12 deletions pkg/broker/filter/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ var (
// go.opencensus.io/tag/validate.go. Currently those restrictions are:
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)

type ReportArgs struct {
ns string
trigger string
broker string
filterType string
requestType string
ns string
trigger string
broker string
filterType string
requestType string
requestScheme string
}

func init() {
Expand Down Expand Up @@ -116,19 +118,19 @@ func register() {
Description: eventCountM.Description(),
Measure: eventCountM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: processingTimeInMsecM.Description(),
Measure: processingTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
)
if err != nil {
Expand Down Expand Up @@ -190,6 +192,7 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)),
tag.Insert(triggerFilterRequestTypeKey, args.requestType),
tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
)...)
return ctx, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/broker/filter/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
setup()

args := &ReportArgs{
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
requestScheme: "http",
}

r := NewStatsReporter("testcontainer", "testpod")
Expand All @@ -110,6 +111,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
metrics.LabelResponseCodeClass: "2xx",
broker.LabelContainerName: "testcontainer",
broker.LabelUniqueName: "testpod",
metrics.LabelEventScheme: "http",
}

resource := resource.Resource{
Expand Down
15 changes: 12 additions & 3 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,18 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

reporterArgs := &ReportArgs{
ns: brokerNamespace,
broker: brokerName,
eventType: event.Type(),
ns: brokerNamespace,
broker: brokerName,
eventType: event.Type(),
eventScheme: request.URL.Scheme,
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved
}

if reporterArgs.eventScheme == "" {
if request.TLS != nil {
reporterArgs.eventScheme = "https"
} else {
reporterArgs.eventScheme = "http"
}
}

statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, broker)
Expand Down
19 changes: 13 additions & 6 deletions pkg/broker/ingress/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ var (
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
eventTypeKey = tag.MustNewKey(eventingmetrics.LabelEventType)
eventSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)

type ReportArgs struct {
ns string
broker string
eventType string
ns string
broker string
eventType string
eventScheme string
}

func init() {
Expand All @@ -75,8 +77,10 @@ type StatsReporter interface {
ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error
}

var _ StatsReporter = (*reporter)(nil)
var emptyContext = context.Background()
var (
_ StatsReporter = (*reporter)(nil)
emptyContext = context.Background()
)

// Reporter holds cached metric objects to report ingress metrics.
type reporter struct {
Expand All @@ -95,10 +99,12 @@ func NewStatsReporter(container, uniqueName string) StatsReporter {
func register() {
tagKeys := []tag.Key{
eventTypeKey,
eventSchemeKey,
responseCodeKey,
responseCodeClassKey,
broker.ContainerTagKey,
broker.UniqueTagKey}
broker.UniqueTagKey,
}

// Create view to see our measurements.
err := metrics.RegisterResourceView(
Expand Down Expand Up @@ -154,6 +160,7 @@ func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Cont
tag.Insert(broker.ContainerTagKey, r.container),
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(eventTypeKey, args.eventType),
tag.Insert(eventSchemeKey, args.eventScheme),
tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)))
}
8 changes: 5 additions & 3 deletions pkg/broker/ingress/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func TestStatsReporter(t *testing.T) {
setup()

args := &ReportArgs{
ns: "testns",
broker: "testbroker",
eventType: "testeventtype",
ns: "testns",
broker: "testbroker",
eventType: "testeventtype",
eventScheme: "http",
}

r := NewStatsReporter("testcontainer", "testpod")
Expand All @@ -45,6 +46,7 @@ func TestStatsReporter(t *testing.T) {
metrics.LabelResponseCodeClass: "2xx",
broker.LabelUniqueName: "testpod",
broker.LabelContainerName: "testcontainer",
metrics.LabelEventScheme: "http",
}

resource := resource.Resource{
Expand Down
10 changes: 10 additions & 0 deletions pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth

args.Ns = channel.Namespace

args.EventScheme = request.URL.Scheme
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved

if args.EventScheme == "" {
if request.TLS != nil {
args.EventScheme = "https"
} else {
args.EventScheme = "http"
}
}

event, err := http.NewEventFromHTTPRequest(request)
if err != nil {
r.logger.Warn("failed to extract event from request", zap.Error(err))
Expand Down
Loading