From 7de3ecc81d12731d29a7eabe5579444ce165e928 Mon Sep 17 00:00:00 2001 From: Knative Automation Date: Fri, 4 Aug 2023 07:12:07 +0100 Subject: [PATCH] [release-1.10] Upgrade to latest dependencies (#7144) upgrade to latest dependencies bumping knative.dev/reconciler-test ca17404...70df278: > 70df278 Knative Service forwarder for EventsHub (# 517) (# 566) > 7f95e17 eventshub RBAC resources independent to avoid deletion conflicts (# 538) (# 565) > 288a740 Fix unit tests (# 556) Signed-off-by: Knative Automation --- go.mod | 2 +- go.sum | 4 +- .../reconciler-test/cmd/eventshub/main.go | 6 + .../pkg/eventshub/102-service.yaml | 2 +- .../pkg/eventshub/103-pod.yaml | 2 +- .../pkg/eventshub/104-forwarder.yaml | 50 ++++ .../pkg/eventshub/event_log.go | 5 +- .../pkg/eventshub/eventshub_image.go | 4 +- .../pkg/eventshub/forwarder/forwarder.go | 262 ++++++++++++++++++ .../reconciler-test/pkg/eventshub/options.go | 20 +- .../pkg/eventshub/rbac/100-sa.yaml | 2 +- .../pkg/eventshub/rbac/101-rbac.yaml | 8 +- .../pkg/eventshub/rbac/rbac.go | 4 +- .../pkg/eventshub/resources.go | 56 +++- .../pkg/eventshub/sender/sender.go | 4 +- .../reconciler-test/pkg/eventshub/utils.go | 9 +- .../pkg/resources/knativeservice/ksvc.go | 34 +++ vendor/modules.txt | 4 +- 18 files changed, 443 insertions(+), 35 deletions(-) create mode 100644 vendor/knative.dev/reconciler-test/pkg/eventshub/104-forwarder.yaml create mode 100644 vendor/knative.dev/reconciler-test/pkg/eventshub/forwarder/forwarder.go create mode 100644 vendor/knative.dev/reconciler-test/pkg/resources/knativeservice/ksvc.go diff --git a/go.mod b/go.mod index 2d8c0165996..6a590855d61 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( knative.dev/hack v0.0.0-20230417170854-f591fea109b3 knative.dev/hack/schema v0.0.0-20230417170854-f591fea109b3 knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0 - knative.dev/reconciler-test v0.0.0-20230728072509-ca174046aede + knative.dev/reconciler-test v0.0.0-20230803113521-70df278b667d sigs.k8s.io/yaml v1.3.0 ) diff --git a/go.sum b/go.sum index 8f2c279b0aa..44dc2bfe997 100644 --- a/go.sum +++ b/go.sum @@ -1049,8 +1049,8 @@ knative.dev/hack/schema v0.0.0-20230417170854-f591fea109b3 h1:TUHxKhNDLCX/XaqNaX knative.dev/hack/schema v0.0.0-20230417170854-f591fea109b3/go.mod h1:GeIb+PLd5mllawcpHEGF5J5fYTQrvgEO5liao8lUKUs= knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0 h1:EFQcoUo8I4bc+U3y6tR1B3ONYZSHWUdAfI7Vh7dae8g= knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0/go.mod h1:2qWPP9Gjh9Q7ETti+WRHnBnGCSCq+6q7m3p/nmUQviE= -knative.dev/reconciler-test v0.0.0-20230728072509-ca174046aede h1:m+oZ4iW4V2ILrTkkq1wxQYfyiiM5SopYedw8R1lm/iU= -knative.dev/reconciler-test v0.0.0-20230728072509-ca174046aede/go.mod h1:By7fsbkjKWbTmxwAs9lL1itxZI1otbhiEsAZmprEtvI= +knative.dev/reconciler-test v0.0.0-20230803113521-70df278b667d h1:cGtTbth6UZJuH8JAvE1KHQltNXVApxMNFCcnlRgnJKc= +knative.dev/reconciler-test v0.0.0-20230803113521-70df278b667d/go.mod h1:By7fsbkjKWbTmxwAs9lL1itxZI1otbhiEsAZmprEtvI= pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U= pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/vendor/knative.dev/reconciler-test/cmd/eventshub/main.go b/vendor/knative.dev/reconciler-test/cmd/eventshub/main.go index 41293e13619..32909516ca2 100644 --- a/vendor/knative.dev/reconciler-test/cmd/eventshub/main.go +++ b/vendor/knative.dev/reconciler-test/cmd/eventshub/main.go @@ -20,6 +20,7 @@ import ( "context" "knative.dev/pkg/logging" + "knative.dev/reconciler-test/pkg/eventshub/forwarder" "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/eventshub/logger_vent" @@ -45,6 +46,11 @@ func main() { eventshub.SenderEventGenerator: func(ctx context.Context, logs *eventshub.EventLogs) error { return sender.Start(ctx, logs, eventshub.WithClientTracing) }, + eventshub.ForwarderEventGenerator: func(ctx context.Context, logs *eventshub.EventLogs) error { + return forwarder.NewFromEnv(ctx, logs, + []eventshub.HandlerFunc{eventshub.WithServerTracing}, + []eventshub.ClientOption{eventshub.WithClientTracing}).Start(ctx) + }, }, ) } diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/102-service.yaml b/vendor/knative.dev/reconciler-test/pkg/eventshub/102-service.yaml index b0400e7e944..1c2635bd2db 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/102-service.yaml +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/102-service.yaml @@ -15,7 +15,7 @@ apiVersion: v1 kind: Service metadata: - name: {{ .name }} + name: {{ .serviceName }} namespace: {{ .namespace }} spec: selector: diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml b/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml index b49e07681a6..5eaf4bd49b8 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/103-pod.yaml @@ -26,7 +26,7 @@ metadata: {{ end }} {{ end }} spec: - serviceAccountName: "{{ .namespace }}" + serviceAccountName: "{{ .name }}" restartPolicy: "OnFailure" {{ if .podSecurityContext }} securityContext: diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/104-forwarder.yaml b/vendor/knative.dev/reconciler-test/pkg/eventshub/104-forwarder.yaml new file mode 100644 index 00000000000..3534ff20caf --- /dev/null +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/104-forwarder.yaml @@ -0,0 +1,50 @@ +# Copyright 2023 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + name: {{ .name }} + namespace: {{ .namespace }} + {{ if .annotations }} + annotations: + {{ range $key, $value := .annotations }} + {{ $key }}: "{{ $value }}" + {{ end }} + {{ end }} +spec: + template: + {{ if .podannotations }} + annotations: + {{ range $key, $value := .podannotations }} + {{ $key }}: "{{ $value }}" + {{ end }} + {{ end }} + spec: + serviceAccountName: "{{ .namespace }}" + containers: + - name: eventshub-forwarder + image: {{ .image }} + imagePullPolicy: "IfNotPresent" + env: + - name: "NAME" + value: {{ .name }} + - name: "NAMESPACE" + value: {{ .namespace }} + - name: "SINK" + value: {{ .sink }} + {{ range $key, $value := .envs }} + - name: {{printf "%q" $key}} + value: {{printf "%q" $value}} + {{ end }} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go index dc17e88f5cb..a115132992a 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/event_log.go @@ -45,8 +45,9 @@ func (e *EventLogs) Vent(observed EventInfo) error { } const ( - ReceiverEventGenerator string = "receiver" - SenderEventGenerator string = "sender" + ReceiverEventGenerator string = "receiver" + SenderEventGenerator string = "sender" + ForwarderEventGenerator string = "forwarder" RecorderEventLog string = "recorder" LoggerEventLog string = "logger" diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub_image.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub_image.go index 085a8297204..b2b2419acea 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub_image.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/eventshub_image.go @@ -31,7 +31,7 @@ func ImageFromContext(ctx context.Context) string { if e, ok := ctx.Value(eventshubImageKey{}).(string); ok { return e } - return "ko://" + cmdPackage() + return "ko://" + eventshubPackage() } // WithCustomImage allows you to specify a custom eventshub image to be used when invoking eventshub.Install @@ -48,7 +48,7 @@ func registerImage(ctx context.Context) error { return err } -func cmdPackage() string { +func eventshubPackage() string { this := reflect.TypeOf(eventshubImageKey{}).PkgPath() root := path.Dir(path.Dir(this)) return path.Join(root, "cmd", "eventshub") diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/forwarder/forwarder.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/forwarder/forwarder.go new file mode 100644 index 00000000000..ba940f72d3d --- /dev/null +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/forwarder/forwarder.go @@ -0,0 +1,262 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package forwarder + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" + "go.opencensus.io/trace" + "go.uber.org/zap" + + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" + cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/kelseyhightower/envconfig" + "knative.dev/pkg/logging" + + "knative.dev/reconciler-test/pkg/eventshub" +) + +// Forwarder is the entry point for sinking events into the event log. +type Forwarder struct { + // Name is the name of this Forwarder. + Name string + + // The current namespace. + Namespace string `envconfig:"NAMESPACE" required:"true"` + + // Sink + Sink string + + // EventLogs is the list of EventLogger implementors to vent observed events. + EventLogs *eventshub.EventLogs + + ctx context.Context + handlerFuncs []eventshub.HandlerFunc + clientOpts []eventshub.ClientOption + httpClient *http.Client +} + +type envConfig struct { + // Name is used to identify this instance of the forwarder. + Name string `envconfig:"NAME" default:"forwarder-default" required:"true"` + + // The current namespace. + Namespace string `envconfig:"NAMESPACE" required:"true"` + + // Sink url for the message destination + Sink string `envconfig:"SINK" required:"true"` +} + +func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs, handlerFuncs []eventshub.HandlerFunc, clientOpts []eventshub.ClientOption) *Forwarder { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + logging.FromContext(ctx).Fatal("Failed to process env var", err) + } + + logging.FromContext(ctx).Infof("Forwarder environment configuration: %+v", env) + + return &Forwarder{ + Name: env.Name, + Namespace: env.Namespace, + Sink: env.Sink, + EventLogs: eventLogs, + ctx: ctx, + handlerFuncs: handlerFuncs, + clientOpts: clientOpts, + httpClient: &http.Client{}, + } +} + +// Start will create the CloudEvents client and start listening for inbound +// HTTP requests. This is a blocking call. +func (o *Forwarder) Start(ctx context.Context) error { + var handler http.Handler = o + + for _, opt := range o.clientOpts { + if err := opt(o.httpClient); err != nil { + return fmt.Errorf("unable to apply client option: %w", err) + } + } + + for _, dec := range o.handlerFuncs { + handler = dec(handler) + } + + server := &http.Server{Addr: ":8080", Handler: handler} + + var err error + go func() { + err = server.ListenAndServe() + }() + + <-ctx.Done() + + if err != nil { + return fmt.Errorf("error while starting the HTTP server: %w", err) + } + + logging.FromContext(ctx).Info("Closing the HTTP server") + + return server.Close() +} + +func (o *Forwarder) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + requestCtx, span := trace.StartSpan(request.Context(), "eventshub-forwarder") + defer span.End() + + m := cloudeventshttp.NewMessageFromHttpRequest(request) + defer m.Finish(nil) + + event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m) + headers := make(http.Header) + for k, v := range request.Header { + if !strings.HasPrefix(k, "Ce-") { + headers[k] = v + } + } + // Host header is removed from the request.Header map by net/http + if request.Host != "" { + headers.Set("Host", request.Host) + } + + eventErrStr := "" + if eventErr != nil { + eventErrStr = eventErr.Error() + } + + eventInfo := eventshub.EventInfo{ + Error: eventErrStr, + Event: event, + Observer: o.Name, + HTTPHeaders: headers, + Origin: request.RemoteAddr, + Time: time.Now(), + Kind: eventshub.EventReceived, + } + + // Log the event that is being forwarded + if err := o.EventLogs.Vent(eventInfo); err != nil { + logging.FromContext(o.ctx).Fatalw("Error while venting the received event", zap.Error(err)) + } + + req, err := http.NewRequestWithContext(requestCtx, http.MethodPost, o.Sink, nil) + if err != nil { + logging.FromContext(o.ctx).Error("Cannot create the request: ", err) + } + err = cehttp.WriteRequest(requestCtx, m, req) + if err != nil { + logging.FromContext(o.ctx).Error("Cannot write the event: ", err) + } + + eventString := "unknown" + if event != nil { + eventString = event.String() + } + span.AddAttributes( + trace.StringAttribute("namespace", o.Namespace), + trace.StringAttribute("event", eventString), + ) + + res, err := o.httpClient.Do(req) + + // Publish sent event info + if err := o.EventLogs.Vent(o.sentInfo(event, req, err)); err != nil { + logging.FromContext(o.ctx).Error("Cannot log forwarded event: ", err) + } + + if err == nil { + // Vent the response info + if err := o.EventLogs.Vent(o.responseInfo(res, event)); err != nil { + logging.FromContext(o.ctx).Error("Cannot log response for forwarded event: ", err) + } + } + + writer.WriteHeader(http.StatusAccepted) +} + +func (o *Forwarder) sentInfo(event *cloudevents.Event, req *http.Request, err error) eventshub.EventInfo { + var eventId string + if event != nil { + eventId = event.ID() + } + + eventInfo := eventshub.EventInfo{ + Kind: eventshub.EventSent, + Origin: o.Name, + Observer: o.Name, + Time: time.Now(), + SentId: eventId, + } + + sentHeaders := make(http.Header) + for k, v := range req.Header { + sentHeaders[k] = v + } + eventInfo.HTTPHeaders = sentHeaders + + if err != nil { + eventInfo.Error = err.Error() + } else { + eventInfo.Event = event + } + + return eventInfo +} + +func (o *Forwarder) responseInfo(res *http.Response, event *cloudevents.Event) eventshub.EventInfo { + var eventId string + if event != nil { + eventId = event.ID() + } + + responseInfo := eventshub.EventInfo{ + Kind: eventshub.EventResponse, + HTTPHeaders: res.Header, + Origin: o.Sink, + Observer: o.Name, + Time: time.Now(), + StatusCode: res.StatusCode, + SentId: eventId, + } + + responseMessage := cehttp.NewMessageFromHttpResponse(res) + + if responseMessage.ReadEncoding() == cloudeventsbindings.EncodingUnknown { + body, err := ioutil.ReadAll(res.Body) + + if err != nil { + responseInfo.Error = err.Error() + } else { + responseInfo.Body = body + } + } else { + responseEvent, err := cloudeventsbindings.ToEvent(context.Background(), responseMessage) + if err != nil { + responseInfo.Error = err.Error() + } else { + responseInfo.Event = responseEvent + } + } + return responseInfo +} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go index f483944ee8e..2965024bd0f 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/options.go @@ -33,17 +33,29 @@ import ( "knative.dev/reconciler-test/pkg/k8s" ) +type forwarderKey struct{} + +// WithKnativeServiceForwarder deploys a Knative Service forwarder that will forward requests to eventshub. +func WithKnativeServiceForwarder(ctx context.Context, env environment.Environment) (context.Context, error) { + return context.WithValue(ctx, forwarderKey{}, true), nil +} + +func isForwarder(ctx context.Context) bool { + v := ctx.Value(forwarderKey{}) + return v != nil && v.(bool) +} + // EventsHubOption is used to define an env for the eventshub image type EventsHubOption = func(context.Context, map[string]string) error // StartReceiver starts the receiver in the eventshub // This can be used together with EchoEvent, ReplyWithTransformedEvent, ReplyWithAppendedData -var StartReceiver EventsHubOption = envAdditive("EVENT_GENERATORS", "receiver") +var StartReceiver EventsHubOption = envAdditive(EventGeneratorsEnv, "receiver") // StartSender starts the sender in the eventshub // This can be used together with InputEvent, AddTracing, EnableIncrementalId, InputEncoding and InputHeader options func StartSender(sinkSvc string) EventsHubOption { - return compose(envAdditive("EVENT_GENERATORS", "sender"), func(ctx context.Context, envs map[string]string) error { + return compose(envAdditive(EventGeneratorsEnv, "sender"), func(ctx context.Context, envs map[string]string) error { envs["SINK"] = "http://" + network.GetServiceHostname(sinkSvc, environment.FromContext(ctx).Namespace()) return nil }) @@ -52,7 +64,7 @@ func StartSender(sinkSvc string) EventsHubOption { // StartSenderToResource starts the sender in the eventshub pointing to the provided resource // This can be used together with InputEvent, AddTracing, EnableIncrementalId, InputEncoding and InputHeader options func StartSenderToResource(gvr schema.GroupVersionResource, name string) EventsHubOption { - return compose(envAdditive("EVENT_GENERATORS", "sender"), func(ctx context.Context, envs map[string]string) error { + return compose(envAdditive(EventGeneratorsEnv, "sender"), func(ctx context.Context, envs map[string]string) error { u, err := k8s.Address(ctx, gvr, name) if err != nil { return err @@ -68,7 +80,7 @@ func StartSenderToResource(gvr schema.GroupVersionResource, name string) EventsH // StartSenderURL starts the sender in the eventshub sinking to a URL. // This can be used together with InputEvent, AddTracing, EnableIncrementalId, InputEncoding and InputHeader options func StartSenderURL(sink string) EventsHubOption { - return compose(envAdditive("EVENT_GENERATORS", "sender"), func(ctx context.Context, envs map[string]string) error { + return compose(envAdditive(EventGeneratorsEnv, "sender"), func(ctx context.Context, envs map[string]string) error { envs["SINK"] = sink return nil }) diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/100-sa.yaml b/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/100-sa.yaml index b949962725b..f86b523942c 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/100-sa.yaml +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/100-sa.yaml @@ -15,5 +15,5 @@ apiVersion: v1 kind: ServiceAccount metadata: - name: {{ .namespace }} + name: {{ .name }} namespace: {{ .namespace }} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/101-rbac.yaml b/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/101-rbac.yaml index 5839e77b2a9..dffe43896d3 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/101-rbac.yaml +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/101-rbac.yaml @@ -15,7 +15,7 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: - name: {{ .namespace }} + name: {{ .name }} namespace: {{ .namespace }} rules: - apiGroups: [ "" ] @@ -35,13 +35,13 @@ rules: apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: - name: {{ .namespace }} + name: {{ .name }} namespace: {{ .namespace }} roleRef: apiGroup: rbac.authorization.k8s.io kind: Role - name: {{ .namespace }} + name: {{ .name }} subjects: - kind: ServiceAccount - name: {{ .namespace }} + name: {{ .name }} namespace: {{ .namespace }} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/rbac.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/rbac.go index 099b73a83b9..de8a2cfbde0 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/rbac.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/rbac/rbac.go @@ -31,9 +31,9 @@ var templates embed.FS // Install creates the necessary ServiceAccount, Role, RoleBinding for the eventshub. // The resources are named according to the current namespace defined in the environment. -func Install() feature.StepFn { +func Install(cfg map[string]interface{}) feature.StepFn { return func(ctx context.Context, t feature.T) { - if _, err := manifest.InstallYamlFS(ctx, templates, map[string]interface{}{}); err != nil && !apierrors.IsAlreadyExists(err) { + if _, err := manifest.InstallYamlFS(ctx, templates, cfg); err != nil && !apierrors.IsAlreadyExists(err) { t.Fatal(err) } } diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go index fdf97692cbd..98a643309a1 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/resources.go @@ -29,10 +29,15 @@ import ( "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" "knative.dev/reconciler-test/pkg/manifest" + "knative.dev/reconciler-test/pkg/resources/knativeservice" + "knative.dev/reconciler-test/pkg/resources/service" ) -//go:embed *.yaml -var templates embed.FS +//go:embed 102-service.yaml 103-pod.yaml +var servicePodTemplates embed.FS + +//go:embed 104-forwarder.yaml +var forwarderTemplates embed.FS // Install starts a new eventshub with the provided name // Note: this function expects that the Environment is configured with the @@ -67,26 +72,40 @@ func Install(name string, options ...EventsHubOption) feature.StepFn { eventListener := k8s.EventListenerFromContext(ctx) registerEventsHubStore(ctx, eventListener, name, namespace) - // Install ServiceAccount, Role, RoleBinding - eventshubrbac.Install()(ctx, t) + isReceiver := strings.Contains(envs[EventGeneratorsEnv], "receiver") - isReceiver := strings.Contains(envs["EVENT_GENERATORS"], "receiver") + var withForwarder bool + // Allow forwarder only when eventshub is receiver. + if isForwarder(ctx) && isReceiver { + withForwarder = isForwarder(ctx) + } + + serviceName := name + // When forwarder is included we need to rename the eventshub service to + // prevent conflict with the forwarder service. + if withForwarder { + serviceName = feature.MakeRandomK8sName(name) + } cfg := map[string]interface{}{ "name": name, + "serviceName": serviceName, "envs": envs, "image": ImageFromContext(ctx), "withReadiness": isReceiver, } + // Install ServiceAccount, Role, RoleBinding + eventshubrbac.Install(cfg)(ctx, t) + if ic := environment.GetIstioConfig(ctx); ic.Enabled { manifest.WithIstioPodAnnotations(cfg) } manifest.PodSecurityCfgFn(ctx, t)(cfg) - // Deploy - if _, err := manifest.InstallYamlFS(ctx, templates, cfg); err != nil { + // Deploy Service/Pod + if _, err := manifest.InstallYamlFS(ctx, servicePodTemplates, cfg); err != nil { log.Fatal(err) } @@ -94,8 +113,27 @@ func Install(name string, options ...EventsHubOption) feature.StepFn { // If the eventhubs starts an event receiver, we need to wait for the service endpoint to be synced if isReceiver { - k8s.WaitForServiceEndpointsOrFail(ctx, t, name, 1) - k8s.WaitForServiceReadyOrFail(ctx, t, name, "/health/ready") + k8s.WaitForServiceEndpointsOrFail(ctx, t, serviceName, 1) + k8s.WaitForServiceReadyOrFail(ctx, t, serviceName, "/health/ready") + } + + if withForwarder { + sinkURL, err := service.Address(ctx, serviceName) + if err != nil { + log.Fatal(err) + } + // At this point env contains "receiver" so we need to override it. + envs[EventGeneratorsEnv] = "forwarder" + // No event recording desired, just logging. + envs[EventLogsEnv] = "logger" + cfg["envs"] = envs + cfg["sink"] = sinkURL + + // Deploy Forwarder + if _, err := manifest.InstallYamlFS(ctx, forwarderTemplates, cfg); err != nil { + log.Fatal(err) + } + knativeservice.IsReady(name) } } } diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/sender/sender.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/sender/sender.go index f423612b498..7b99563b962 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/sender/sender.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/sender/sender.go @@ -107,9 +107,7 @@ type generator struct { eventQueue []conformanceevent.Event } -type Option func(*nethttp.Client) error - -func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...Option) error { +func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventshub.ClientOption) error { var env generator if err := envconfig.Process("", &env); err != nil { return fmt.Errorf("failed to process env var. %w", err) diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/utils.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/utils.go index 8400e5db79d..e629d0f074b 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/utils.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/utils.go @@ -33,8 +33,10 @@ import ( ) const ( - ConfigTracingEnv = "K_CONFIG_TRACING" - ConfigLoggingEnv = "K_CONFIG_LOGGING" + ConfigTracingEnv = "K_CONFIG_TRACING" + ConfigLoggingEnv = "K_CONFIG_LOGGING" + EventGeneratorsEnv = "EVENT_GENERATORS" + EventLogsEnv = "EVENT_LOGS" ) func ParseHeaders(serializedHeaders string) http.Header { @@ -115,3 +117,6 @@ func WithClientTracing(client *http.Client) error { } return nil } + +type HandlerFunc func(handler http.Handler) http.Handler +type ClientOption func(*http.Client) error diff --git a/vendor/knative.dev/reconciler-test/pkg/resources/knativeservice/ksvc.go b/vendor/knative.dev/reconciler-test/pkg/resources/knativeservice/ksvc.go new file mode 100644 index 00000000000..9a81db3378c --- /dev/null +++ b/vendor/knative.dev/reconciler-test/pkg/resources/knativeservice/ksvc.go @@ -0,0 +1,34 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package knativeservice + +import ( + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" +) + +func GVR() schema.GroupVersionResource { + return schema.GroupVersionResource{Group: "serving.knative.dev", Version: "v1", Resource: "services"} +} + +// IsReady tests to see if a knative Service becomes ready within the time given. +func IsReady(name string, timings ...time.Duration) feature.StepFn { + return k8s.IsReady(GVR(), name, timings...) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e90f757e1de..58224824d12 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1379,7 +1379,7 @@ knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/conversion knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20230728072509-ca174046aede +# knative.dev/reconciler-test v0.0.0-20230803113521-70df278b667d ## explicit; go 1.18 knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment @@ -1388,6 +1388,7 @@ knative.dev/reconciler-test/pkg/eventshub/assert knative.dev/reconciler-test/pkg/eventshub/dropevents knative.dev/reconciler-test/pkg/eventshub/dropevents/dropeventsfibonacci knative.dev/reconciler-test/pkg/eventshub/dropevents/dropeventsfirst +knative.dev/reconciler-test/pkg/eventshub/forwarder knative.dev/reconciler-test/pkg/eventshub/logger_vent knative.dev/reconciler-test/pkg/eventshub/rbac knative.dev/reconciler-test/pkg/eventshub/receiver @@ -1404,6 +1405,7 @@ knative.dev/reconciler-test/pkg/milestone knative.dev/reconciler-test/pkg/resources/cronjob knative.dev/reconciler-test/pkg/resources/deployment knative.dev/reconciler-test/pkg/resources/job +knative.dev/reconciler-test/pkg/resources/knativeservice knative.dev/reconciler-test/pkg/resources/pod knative.dev/reconciler-test/pkg/resources/service knative.dev/reconciler-test/pkg/state