diff --git a/pkg/adapter/v2/cloudevents.go b/pkg/adapter/v2/cloudevents.go index 316f47b9e84..0a17fa55f75 100644 --- a/pkg/adapter/v2/cloudevents.go +++ b/pkg/adapter/v2/cloudevents.go @@ -210,11 +210,19 @@ func NewClient(cfg ClientConfig) (Client, error) { reporter: cfg.Reporter, crStatusEventClient: cfg.CrStatusEventClient, oidcTokenProvider: cfg.TokenProvider, + scheme: "http", } if cfg.Env != nil { client.audience = cfg.Env.GetAudience() client.oidcServiceAccountName = cfg.Env.GetOIDCServiceAccountName() + sinkURI := cfg.Env.GetSink() + if sinkURI != "" { + parsedUrl, err := url.Parse(sinkURI) + if err == nil { + client.scheme = parsedUrl.Scheme + } + } } return client, nil @@ -234,12 +242,12 @@ func setTimeOut(duration time.Duration) http.Option { } type client struct { - ceClient cloudevents.Client - ceOverrides *duckv1.CloudEventOverrides - reporter source.StatsReporter - crStatusEventClient *crstatusevent.CRStatusEventClient - closeIdler closeIdler - + ceClient cloudevents.Client + ceOverrides *duckv1.CloudEventOverrides + reporter source.StatsReporter + crStatusEventClient *crstatusevent.CRStatusEventClient + closeIdler closeIdler + scheme string oidcTokenProvider *auth.OIDCTokenProvider audience *string oidcServiceAccountName *types.NamespacedName @@ -302,6 +310,7 @@ func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, res if c.reporter == nil { return } + tags := MetricTagFromContext(ctx) reportArgs := &source.ReportArgs{ Namespace: tags.Namespace, @@ -309,6 +318,7 @@ func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, res EventType: event.Type(), Name: tags.Name, ResourceGroup: tags.ResourceGroup, + EventScheme: c.scheme, } var rres *http.RetriesResult diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index d308a88f230..ade7c035921 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -241,6 +241,12 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve requestType: "reply_forward", } + if request.TLS != nil { + reportArgs.requestScheme = "https" + } else { + reportArgs.requestScheme = "http" + } + h.logger.Info("sending to reply", zap.Any("target", target)) // since the broker-filter acts here like a proxy, we don't filter headers @@ -277,6 +283,12 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event requestType: "dls_forward", } + if request.TLS != nil { + reportArgs.requestScheme = "https" + } else { + reportArgs.requestScheme = "http" + } + h.logger.Info("sending to dls", zap.Any("target", target)) // since the broker-filter acts here like a proxy, we don't filter headers @@ -312,6 +324,12 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger requestType: "filter", } + if request.TLS != nil { + reportArgs.requestScheme = "https" + } else { + reportArgs.requestScheme = "http" + } + subscriberURI := trigger.Status.SubscriberURI if subscriberURI == nil { // Record the event count. @@ -342,7 +360,6 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger } func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { - additionalHeaders := headers.Clone() additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go index 246e4d52bbf..c2df9b3c9b5 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/broker/filter/stats_reporter.go @@ -67,18 +67,20 @@ var ( // go.opencensus.io/tag/validate.go. Currently those restrictions are: // - length between 1 and 255 inclusive // - characters are printable US-ASCII - triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType) - triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type") - responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) - responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass) + triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType) + triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type") + triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme) + responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) + responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass) ) type ReportArgs struct { - ns string - trigger string - broker string - filterType string - requestType string + ns string + trigger string + broker string + filterType string + requestType string + requestScheme string } func init() { @@ -116,19 +118,19 @@ func register() { Description: eventCountM.Description(), Measure: eventCountM, Aggregation: view.Count(), - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, }, &view.View{ Description: dispatchTimeInMsecM.Description(), Measure: dispatchTimeInMsecM, Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, }, &view.View{ Description: processingTimeInMsecM.Description(), Measure: processingTimeInMsecM, Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, broker.UniqueTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey}, }, ) if err != nil { @@ -190,6 +192,7 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C tag.Insert(broker.UniqueTagKey, r.uniqueName), tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)), tag.Insert(triggerFilterRequestTypeKey, args.requestType), + tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme), )...) return ctx, err } diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index 5d08d605d7e..da1635dfe0f 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -96,10 +96,11 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) { setup() args := &ReportArgs{ - ns: "testns", - trigger: "testtrigger", - broker: "testbroker", - filterType: "", + ns: "testns", + trigger: "testtrigger", + broker: "testbroker", + filterType: "", + requestScheme: "http", } r := NewStatsReporter("testcontainer", "testpod") @@ -110,6 +111,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) { metrics.LabelResponseCodeClass: "2xx", broker.LabelContainerName: "testcontainer", broker.LabelUniqueName: "testpod", + metrics.LabelEventScheme: "http", } resource := resource.Resource{ diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 9deb9dcdfc1..577c308f930 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -262,6 +262,12 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { eventType: event.Type(), } + if request.TLS != nil { + reporterArgs.eventScheme = "https" + } else { + reporterArgs.eventScheme = "http" + } + statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, broker) if dispatchTime > kncloudevents.NoDuration { _ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime) diff --git a/pkg/broker/ingress/stats_reporter.go b/pkg/broker/ingress/stats_reporter.go index 091ca0f0d99..209981e76f4 100644 --- a/pkg/broker/ingress/stats_reporter.go +++ b/pkg/broker/ingress/stats_reporter.go @@ -55,14 +55,16 @@ var ( // - length between 1 and 255 inclusive // - characters are printable US-ASCII eventTypeKey = tag.MustNewKey(eventingmetrics.LabelEventType) + eventSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme) responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass) ) type ReportArgs struct { - ns string - broker string - eventType string + ns string + broker string + eventType string + eventScheme string } func init() { @@ -75,8 +77,10 @@ type StatsReporter interface { ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error } -var _ StatsReporter = (*reporter)(nil) -var emptyContext = context.Background() +var ( + _ StatsReporter = (*reporter)(nil) + emptyContext = context.Background() +) // Reporter holds cached metric objects to report ingress metrics. type reporter struct { @@ -95,10 +99,12 @@ func NewStatsReporter(container, uniqueName string) StatsReporter { func register() { tagKeys := []tag.Key{ eventTypeKey, + eventSchemeKey, responseCodeKey, responseCodeClassKey, broker.ContainerTagKey, - broker.UniqueTagKey} + broker.UniqueTagKey, + } // Create view to see our measurements. err := metrics.RegisterResourceView( @@ -154,6 +160,7 @@ func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Cont tag.Insert(broker.ContainerTagKey, r.container), tag.Insert(broker.UniqueTagKey, r.uniqueName), tag.Insert(eventTypeKey, args.eventType), + tag.Insert(eventSchemeKey, args.eventScheme), tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode))) } diff --git a/pkg/broker/ingress/stats_reporter_test.go b/pkg/broker/ingress/stats_reporter_test.go index 0afe3ccfc5b..b948ed37dce 100644 --- a/pkg/broker/ingress/stats_reporter_test.go +++ b/pkg/broker/ingress/stats_reporter_test.go @@ -32,9 +32,10 @@ func TestStatsReporter(t *testing.T) { setup() args := &ReportArgs{ - ns: "testns", - broker: "testbroker", - eventType: "testeventtype", + ns: "testns", + broker: "testbroker", + eventType: "testeventtype", + eventScheme: "http", } r := NewStatsReporter("testcontainer", "testpod") @@ -45,6 +46,7 @@ func TestStatsReporter(t *testing.T) { metrics.LabelResponseCodeClass: "2xx", broker.LabelUniqueName: "testpod", broker.LabelContainerName: "testcontainer", + metrics.LabelEventScheme: "http", } resource := resource.Resource{ diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index 1e3ff3c2cc7..ffb441cbcce 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -231,6 +231,12 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth args.Ns = channel.Namespace + if request.TLS != nil { + args.EventScheme = "https" + } else { + args.EventScheme = "http" + } + event, err := http.NewEventFromHTTPRequest(request) if err != nil { r.logger.Warn("failed to extract event from request", zap.Error(err)) diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index f59361fe398..b4700f19afd 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -93,6 +93,8 @@ type FanoutEventHandler struct { eventTypeHandler *eventtype.EventTypeAutoHandler channelRef *duckv1.KReference channelUID *types.UID + hasHttpSubs bool + hasHttpsSubs bool } // NewFanoutEventHandler creates a new fanout.EventHandler. @@ -105,7 +107,6 @@ func NewFanoutEventHandler( channelUID *types.UID, eventDispatcher *kncloudevents.Dispatcher, receiverOpts ...channel.EventReceiverOptions, - ) (*FanoutEventHandler, error) { handler := &FanoutEventHandler{ logger: logger, @@ -117,8 +118,9 @@ func NewFanoutEventHandler( channelUID: channelUID, eventDispatcher: eventDispatcher, } - handler.subscriptions = make([]Subscription, len(config.Subscriptions)) - copy(handler.subscriptions, config.Subscriptions) + + handler.SetSubscriptions(context.Background(), config.Subscriptions) + // The receiver function needs to point back at the handler itself, so set it up after // initialization. receiver, err := channel.NewEventReceiver(createEventReceiverFunction(handler), logger, reporter, receiverOpts...) @@ -174,14 +176,27 @@ func (f *FanoutEventHandler) SetSubscriptions(ctx context.Context, subs []Subscr s := make([]Subscription, len(subs)) copy(s, subs) f.subscriptions = s + + for _, sub := range f.subscriptions { + if sub.Subscriber.URL != nil && sub.Subscriber.URL.Scheme == "https" { + f.hasHttpsSubs = true + } else { + f.hasHttpSubs = true + } + } } func (f *FanoutEventHandler) GetSubscriptions(ctx context.Context) []Subscription { + ret, _, _ := f.getSubscriptionsWithScheme() + return ret +} + +func (f *FanoutEventHandler) getSubscriptionsWithScheme() (ret []Subscription, hasHttpSubs bool, hasHttpsSubs bool) { f.subscriptionsMutex.RLock() defer f.subscriptionsMutex.RUnlock() - ret := make([]Subscription, len(f.subscriptions)) + ret = make([]Subscription, len(f.subscriptions)) copy(ret, f.subscriptions) - return ret + return ret, f.hasHttpSubs, f.hasHttpsSubs } func (f *FanoutEventHandler) autoCreateEventType(ctx context.Context, evnt event.Event) { @@ -208,7 +223,7 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch f.autoCreateEventType(ctx, evnt) } - subs := f.GetSubscriptions(ctx) + subs, hasHttpSubs, hasHttpsSubs := f.getSubscriptionsWithScheme() if len(subs) == 0 { // Nothing to do here @@ -227,6 +242,15 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch // Any returned error is already logged in f.dispatch(). dispatchResultForFanout := f.dispatch(ctx, subs, e, h) _ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args) + // If there are both http and https subscribers, we need to report the metrics for both of the type + if hasHttpSubs { + reportArgs.EventScheme = "http" + _ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args) + } + if hasHttpsSubs { + reportArgs.EventScheme = "https" + _ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args) + } }(evnt, additionalHeaders, parentSpan, &f.reporter, &reportArgs) return nil } @@ -236,7 +260,7 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch f.autoCreateEventType(ctx, event) } - subs := f.GetSubscriptions(ctx) + subs, hasHttpSubs, hasHttpsSubs := f.getSubscriptionsWithScheme() if len(subs) == 0 { // Nothing to do here return nil @@ -245,9 +269,21 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch reportArgs := channel.ReportArgs{} reportArgs.EventType = event.Type() reportArgs.Ns = ref.Namespace + additionalHeaders.Set(apis.KnNamespaceHeader, ref.Namespace) dispatchResultForFanout := f.dispatch(ctx, subs, event, additionalHeaders) - return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) + err := ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) + // If there are both http and https subscribers, we need to report the metrics for both of the type + // In this case we report http metrics because above we checked first for https and reported it so the left over metric to report is for http + if hasHttpSubs { + reportArgs.EventScheme = "http" + err = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) + } + if hasHttpsSubs { + reportArgs.EventScheme = "https" + err = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) + } + return err } } diff --git a/pkg/channel/stats_reporter.go b/pkg/channel/stats_reporter.go index ce84a1a9d90..7bc9821358f 100644 --- a/pkg/channel/stats_reporter.go +++ b/pkg/channel/stats_reporter.go @@ -54,13 +54,15 @@ var ( // - characters are printable US-ASCII namespaceKey = tag.MustNewKey(eventingmetrics.LabelNamespaceName) eventTypeKey = tag.MustNewKey(eventingmetrics.LabelEventType) + eventScheme = tag.MustNewKey(eventingmetrics.LabelEventScheme) responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass) ) type ReportArgs struct { - Ns string - EventType string + Ns string + EventType string + EventScheme string } func init() { @@ -93,6 +95,7 @@ func register() { tagKeys := []tag.Key{ namespaceKey, eventTypeKey, + eventScheme, responseCodeKey, responseCodeClassKey, UniqueTagKey, @@ -145,6 +148,7 @@ func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Cont emptyContext, tag.Insert(namespaceKey, args.Ns), tag.Insert(eventTypeKey, args.EventType), + tag.Insert(eventScheme, args.EventScheme), tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)), tag.Insert(ContainerTagKey, r.container), diff --git a/pkg/channel/stats_reporter_test.go b/pkg/channel/stats_reporter_test.go index 6ca647ad654..9038fe6eafe 100644 --- a/pkg/channel/stats_reporter_test.go +++ b/pkg/channel/stats_reporter_test.go @@ -29,8 +29,9 @@ import ( func TestStatsReporter(t *testing.T) { setup() args := &ReportArgs{ - Ns: "testns", - EventType: "testeventtype", + Ns: "testns", + EventType: "testeventtype", + EventScheme: "http", } r := NewStatsReporter("testcontainer", "testpod") @@ -42,6 +43,7 @@ func TestStatsReporter(t *testing.T) { metrics.LabelResponseCodeClass: "2xx", LabelUniqueName: "testpod", LabelContainerName: "testcontainer", + metrics.LabelEventScheme: "http", } // test ReportEventCount diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 23723883162..363673b101f 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -45,6 +45,9 @@ const ( // LabelEventType is the label for the name of the event type. LabelEventType = "event_type" + // LabelEventType is the label for the name of the event type. + LabelEventScheme = "event_scheme" + // LabelEventSource is the label for the name of the event source. LabelEventSource = "event_source" diff --git a/pkg/metrics/source/stats_reporter.go b/pkg/metrics/source/stats_reporter.go index 9fcce5ca338..3c39c582363 100644 --- a/pkg/metrics/source/stats_reporter.go +++ b/pkg/metrics/source/stats_reporter.go @@ -50,6 +50,7 @@ var ( namespaceKey = tag.MustNewKey(eventingmetrics.LabelNamespaceName) eventSourceKey = tag.MustNewKey(eventingmetrics.LabelEventSource) eventTypeKey = tag.MustNewKey(eventingmetrics.LabelEventType) + eventScheme = tag.MustNewKey(eventingmetrics.LabelEventScheme) sourceNameKey = tag.MustNewKey(eventingmetrics.LabelName) sourceResourceGroupKey = tag.MustNewKey(eventingmetrics.LabelResourceGroup) responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) @@ -62,6 +63,7 @@ var ( type ReportArgs struct { Namespace string EventType string + EventScheme string EventSource string Name string ResourceGroup string @@ -122,6 +124,7 @@ func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Cont tag.Insert(namespaceKey, args.Namespace), tag.Insert(eventSourceKey, args.EventSource), tag.Insert(eventTypeKey, args.EventType), + tag.Insert(eventScheme, args.EventScheme), tag.Insert(sourceNameKey, args.Name), tag.Insert(sourceResourceGroupKey, args.ResourceGroup), metrics.MaybeInsertIntTag(responseCodeKey, responseCode, responseCode > 0), @@ -135,12 +138,14 @@ func register() { namespaceKey, eventSourceKey, eventTypeKey, + eventScheme, sourceNameKey, sourceResourceGroupKey, responseCodeKey, responseCodeClassKey, responseError, - responseTimeout} + responseTimeout, + } // Create view to see our measurements. if err := view.Register( diff --git a/pkg/metrics/source/stats_reporter_test.go b/pkg/metrics/source/stats_reporter_test.go index da892865fe8..d039fff29c0 100644 --- a/pkg/metrics/source/stats_reporter_test.go +++ b/pkg/metrics/source/stats_reporter_test.go @@ -34,6 +34,7 @@ func TestStatsReporter(t *testing.T) { EventSource: "unit-test", Name: "testsource", ResourceGroup: "testresourcegroup", + EventScheme: "http", } r, err := NewStatsReporter() @@ -49,6 +50,7 @@ func TestStatsReporter(t *testing.T) { metrics.LabelResourceGroup: "testresourcegroup", metrics.LabelResponseCode: "202", metrics.LabelResponseCodeClass: "2xx", + metrics.LabelEventScheme: "http", } retryWantTags := map[string]string{ @@ -59,6 +61,7 @@ func TestStatsReporter(t *testing.T) { metrics.LabelResourceGroup: "testresourcegroup", metrics.LabelResponseCode: "503", metrics.LabelResponseCodeClass: "5xx", + metrics.LabelEventScheme: "http", } // test ReportEventCount and ReportRetryEventCount @@ -85,7 +88,8 @@ func TestBadValues(t *testing.T) { } args := &ReportArgs{ - Namespace: "😀", + Namespace: "😀", + EventScheme: "http", } if err := r.ReportEventCount(args, 200); err == nil {