diff --git a/pkg/adapter/mtping/runner.go b/pkg/adapter/mtping/runner.go index d86dabf9d02..817d7b15adf 100644 --- a/pkg/adapter/mtping/runner.go +++ b/pkg/adapter/mtping/runner.go @@ -192,7 +192,7 @@ func (a *cronJobsRunner) newPingSourceClient(source *sourcesv1.PingSource) (adap var env adapter.EnvConfig if a.clientConfig.Env != nil { env = adapter.EnvConfig{ - Namespace: a.clientConfig.Env.GetNamespace(), + Namespace: source.GetNamespace(), Name: a.clientConfig.Env.GetName(), EnvSinkTimeout: fmt.Sprintf("%d", a.clientConfig.Env.GetSinktimeout()), } diff --git a/pkg/adapter/v2/cloudevents.go b/pkg/adapter/v2/cloudevents.go index 43089121320..556d888126b 100644 --- a/pkg/adapter/v2/cloudevents.go +++ b/pkg/adapter/v2/cloudevents.go @@ -35,6 +35,7 @@ import ( "knative.dev/pkg/tracing/propagation/tracecontextb3" "knative.dev/eventing/pkg/adapter/v2/util/crstatusevent" + "knative.dev/eventing/pkg/apis" "knative.dev/eventing/pkg/eventingtls" "knative.dev/eventing/pkg/metrics/source" obsclient "knative.dev/eventing/pkg/observability/client" @@ -171,6 +172,8 @@ func NewClient(cfg ClientConfig) (Client, error) { return nil, err } } + + pOpts = append(pOpts, http.WithHeader(apis.KnNamespaceHeader, cfg.Env.GetNamespace())) } pOpts = append(pOpts, http.WithRoundTripper(transport)) diff --git a/pkg/apis/http.go b/pkg/apis/http.go new file mode 100644 index 00000000000..30f8f4d8c06 --- /dev/null +++ b/pkg/apis/http.go @@ -0,0 +1,21 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apis + +const ( + KnNamespaceHeader = "Kn-Namespace" +) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 64656a91ace..4da9199365b 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -34,13 +34,15 @@ import ( "go.opencensus.io/trace" "go.uber.org/zap" "k8s.io/client-go/tools/cache" - channelAttributes "knative.dev/eventing/pkg/channel/attributes" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/logging" + "knative.dev/eventing/pkg/apis" + channelAttributes "knative.dev/eventing/pkg/channel/attributes" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" - broker "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/broker" v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" "knative.dev/eventing/pkg/eventfilter" @@ -240,12 +242,12 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { URL: t.Status.SubscriberURI, CACerts: t.Status.SubscriberCACerts, } - h.send(ctx, writer, request.Header, target, reportArgs, event, ttl) + h.send(ctx, writer, request.Header, target, reportArgs, event, t, ttl) } -func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) { +func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { // send the event to trigger's subscriber - response, responseErr := h.sendEvent(ctx, headers, target, event, reportArgs) + response, responseErr := h.sendEvent(ctx, headers, target, event, t, reportArgs) if responseErr.err != nil { h.logger.Error("failed to send event", zap.Error(responseErr.err)) @@ -288,7 +290,7 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers _ = h.reporter.ReportEventCount(reportArgs, statusCode) } -func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target duckv1.Addressable, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, ErrHandler) { +func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target duckv1.Addressable, event *cloudevents.Event, t *eventingv1.Trigger, reporterArgs *ReportArgs) (*http.Response, ErrHandler) { responseErr := ErrHandler{ ResponseCode: NoResponse, } @@ -304,6 +306,7 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target duc defer message.Finish(nil) additionalHeaders := utils.PassThroughHeaders(headers) + additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) // Following the spec https://github.com/knative/specs/blob/main/specs/eventing/data-plane.md#derived-reply-events additionalHeaders.Set("prefer", "reply") diff --git a/pkg/channel/fanout/fanout_message_handler.go b/pkg/channel/fanout/fanout_message_handler.go index 414ede173eb..a1be0a86345 100644 --- a/pkg/channel/fanout/fanout_message_handler.go +++ b/pkg/channel/fanout/fanout_message_handler.go @@ -32,10 +32,12 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/buffering" "go.opencensus.io/trace" "go.uber.org/zap" + duckv1 "knative.dev/pkg/apis/duck/v1" + + "knative.dev/eventing/pkg/apis" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/kncloudevents" - duckv1 "knative.dev/pkg/apis/duck/v1" ) const ( @@ -191,6 +193,7 @@ func createMessageReceiverFunction(f *FanoutMessageHandler) func(context.Context go func(m binding.Message, h nethttp.Header, s *trace.Span, r *channel.StatsReporter, args *channel.ReportArgs) { // Run async dispatch with background context. ctx = trace.NewContext(context.Background(), s) + h.Set(apis.KnNamespaceHeader, ref.Namespace) // Any returned error is already logged in f.dispatch(). dispatchResultForFanout := f.dispatch(ctx, subs, m, h) _ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args) diff --git a/pkg/channel/message_dispatcher.go b/pkg/channel/message_dispatcher.go index ef093942d32..2806aa75909 100644 --- a/pkg/channel/message_dispatcher.go +++ b/pkg/channel/message_dispatcher.go @@ -38,6 +38,8 @@ import ( "knative.dev/pkg/network" "knative.dev/pkg/system" + eventingapis "knative.dev/eventing/pkg/apis" + "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/channel/attributes" "knative.dev/eventing/pkg/kncloudevents" @@ -148,6 +150,13 @@ func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context, responseAdditionalHeaders = additionalHeaders } + if additionalHeaders.Get(eventingapis.KnNamespaceHeader) != "" { + if responseAdditionalHeaders == nil { + responseAdditionalHeaders = make(nethttp.Header) + } + responseAdditionalHeaders.Set(eventingapis.KnNamespaceHeader, additionalHeaders.Get(eventingapis.KnNamespaceHeader)) + } + // No response, dispatch completed if responseMessage == nil { return dispatchExecutionInfo, nil diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index 58ff8bc2b3b..8da2349465d 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -26,9 +26,11 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/spec" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" + "knative.dev/reconciler-test/pkg/environment" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/channel" "knative.dev/eventing/test/rekt/resources/subscription" @@ -186,10 +188,12 @@ func BrokerWithManyTriggers() *feature.Feature { for _, matcher := range matchers { // One match per event is enough f.Stable("test message without explicit prefer header should have the header"). - Must("delivers events", - eventasssert.OnStore(sink).Match( - matcher, - ).AtLeast(1)) + Must("delivers events", func(ctx context.Context, t feature.T) { + eventasssert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + Match(matcher). + AtLeast(1)(ctx, t) + }) } } } @@ -628,14 +632,19 @@ func brokerRedeliveryDropN(retryNum int32, dropNum uint) *feature.Feature { f.Stable("Broker Redelivery failed the first n events"). Must("delivers events", - eventasssert.OnStore(sink).Match( - eventasssert.MatchKind(eventasssert.EventReceived), - eventasssert.MatchEvent( - test.HasSource(eventSource), - test.HasType(eventType), - test.HasData([]byte(eventBody)), - ), - ).AtLeast(1)) + func(ctx context.Context, t feature.T) { + eventasssert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + Match( + eventasssert.MatchKind(eventasssert.EventReceived), + eventasssert.MatchEvent( + test.HasSource(eventSource), + test.HasType(eventType), + test.HasData([]byte(eventBody)), + ), + ). + AtLeast(1)(ctx, t) + }) return f } @@ -693,11 +702,14 @@ func brokerSubscriberUnreachable() *feature.Feature { )) f.Assert("Receives dls extensions when subscriber is unreachable", - eventasssert.OnStore(sink). - MatchEvent( - test.HasExtension("knativeerrordest", "http://fake.svc.cluster.local"), - ). - AtLeast(1), + func(ctx context.Context, t feature.T) { + eventasssert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + MatchEvent( + test.HasExtension("knativeerrordest", "http://fake.svc.cluster.local"), + ). + AtLeast(1)(ctx, t) + }, ) return f } diff --git a/test/rekt/features/channel/features.go b/test/rekt/features/channel/features.go index b929f36783b..788fd41d7f8 100644 --- a/test/rekt/features/channel/features.go +++ b/test/rekt/features/channel/features.go @@ -26,6 +26,7 @@ import ( "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/manifest" @@ -35,6 +36,7 @@ import ( eventasssert "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/channel" "knative.dev/eventing/test/rekt/resources/channel_impl" "knative.dev/eventing/test/rekt/resources/containersource" @@ -107,10 +109,12 @@ func DeadLetterSink(createSubscriberFn func(ref *duckv1.KReference, uri string) f.Requirement("containersource is ready", containersource.IsReady(cs)) f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(name, channel_impl.GVR())) - f.Assert("dls receives events", assert.OnStore(sink). - MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). - AtLeast(1), - ) + f.Assert("dls receives events", func(ctx context.Context, t feature.T) { + assert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). + AtLeast(1)(ctx, t) + }) return f } @@ -140,10 +144,12 @@ func DeadLetterSinkGenericChannel(createSubscriberFn func(ref *duckv1.KReference f.Requirement("containersource is ready", containersource.IsReady(cs)) f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(name, channel.GVR())) - f.Assert("dls receives events", assert.OnStore(sink). - MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). - AtLeast(1), - ) + f.Assert("dls receives events", func(ctx context.Context, t feature.T) { + assert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). + AtLeast(1)(ctx, t) + }) return f } @@ -306,10 +312,12 @@ func ChannelPreferHeaderCheck(createSubscriberFn func(ref *duckv1.KReference, ur )) f.Stable("test message without explicit prefer header should have the header"). - Must("delivers events", + Must("delivers events", func(ctx context.Context, t feature.T) { eventasssert.OnStore(sink).Match( + features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace()), eventasssert.HasAdditionalHeader("Prefer", "reply"), - ).AtLeast(1)) + ).AtLeast(1)(ctx, t) + }) return f } diff --git a/test/rekt/features/matchers.go b/test/rekt/features/matchers.go new file mode 100644 index 00000000000..292b84f4e0f --- /dev/null +++ b/test/rekt/features/matchers.go @@ -0,0 +1,40 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package features + +import ( + "fmt" + + "knative.dev/reconciler-test/pkg/eventshub" + + "knative.dev/eventing/pkg/apis" +) + +func HasKnNamespaceHeader(ns string) eventshub.EventInfoMatcher { + return func(info eventshub.EventInfo) error { + values, ok := info.HTTPHeaders[apis.KnNamespaceHeader] + if !ok { + return fmt.Errorf("%s header not found", apis.KnNamespaceHeader) + } + for _, v := range values { + if v == ns { + return nil + } + } + return fmt.Errorf("wanted %s header to have value %s, got %+v", apis.KnNamespaceHeader, ns, values) + } +} diff --git a/test/rekt/features/pingsource/features.go b/test/rekt/features/pingsource/features.go index fb143730148..55bac218a25 100644 --- a/test/rekt/features/pingsource/features.go +++ b/test/rekt/features/pingsource/features.go @@ -22,6 +22,7 @@ import ( "github.com/cloudevents/sdk-go/v2/test" "k8s.io/apimachinery/pkg/util/sets" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/manifest" @@ -31,6 +32,7 @@ import ( eventassert "knative.dev/reconciler-test/pkg/eventshub/assert" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/features/featureflags" "knative.dev/eventing/test/rekt/features/source" "knative.dev/eventing/test/rekt/resources/broker" @@ -51,7 +53,13 @@ func SendsEventsWithSinkRef() *feature.Feature { f.Stable("pingsource as event source"). Must("delivers events", - assert.OnStore(sink).MatchEvent(test.HasType("dev.knative.sources.ping")).AtLeast(1)) + func(ctx context.Context, t feature.T) { + assert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + MatchEvent(test.HasType("dev.knative.sources.ping")). + AtLeast(1)(ctx, t) + }, + ) return f }