From 37e70306a4c0b4efa1c3366bb2cd8043a67d4196 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 31 Aug 2020 13:54:00 -0400 Subject: [PATCH 1/4] eager pingsource adapter creation --- config/500-pingsource-mt-adapter.yaml | 1 + config/core/deployments/controller.yaml | 3 - .../deployments/pingsource-mt-adapter.yaml | 61 ++++++++++ pkg/reconciler/pingsource/controller.go | 26 +---- pkg/reconciler/pingsource/controller_test.go | 108 +++++------------- pkg/reconciler/pingsource/pingsource.go | 41 ++++--- pkg/reconciler/pingsource/pingsource_test.go | 30 ++--- .../pingsource/resources/receive_adapter.go | 93 ++++----------- .../resources/receive_adapter_test.go | 45 +------- 9 files changed, 167 insertions(+), 241 deletions(-) create mode 120000 config/500-pingsource-mt-adapter.yaml create mode 100644 config/core/deployments/pingsource-mt-adapter.yaml diff --git a/config/500-pingsource-mt-adapter.yaml b/config/500-pingsource-mt-adapter.yaml new file mode 120000 index 00000000000..849b9341ff6 --- /dev/null +++ b/config/500-pingsource-mt-adapter.yaml @@ -0,0 +1 @@ +core/deployments/pingsource-mt-adapter.yaml \ No newline at end of file diff --git a/config/core/deployments/controller.yaml b/config/core/deployments/controller.yaml index 03b341c4f01..4279dddee2b 100644 --- a/config/core/deployments/controller.yaml +++ b/config/core/deployments/controller.yaml @@ -65,9 +65,6 @@ spec: value: config-observability - name: METRICS_DOMAIN value: knative.dev/eventing - # PingSource - - name: MT_PING_IMAGE - value: ko://knative.dev/eventing/cmd/mtping # APIServerSource - name: APISERVER_RA_IMAGE value: ko://knative.dev/eventing/cmd/apiserver_receive_adapter diff --git a/config/core/deployments/pingsource-mt-adapter.yaml b/config/core/deployments/pingsource-mt-adapter.yaml new file mode 100644 index 00000000000..7c4f670065b --- /dev/null +++ b/config/core/deployments/pingsource-mt-adapter.yaml @@ -0,0 +1,61 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pingsource-mt-adapter + namespace: knative-eventing + labels: + eventing.knative.dev/release: devel +spec: + replicas: 1 + selector: + matchLabels: + eventing.knative.dev/source: ping-source-controller + sources.knative.dev/role: adapter + template: + metadata: + labels: + eventing.knative.dev/source: ping-source-controller + sources.knative.dev/role: adapter + eventing.knative.dev/release: devel + spec: + containers: + - name: dispatcher + image: ko://knative.dev/eventing/cmd/mtping + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: K_METRICS_CONFIG + value: '' + - name: K_LOGGING_CONFIG + value: '' + - name: K_LOGGING_CONFIG + value: '' + ports: + - containerPort: 9090 + name: metrics + protocol: TCP + resources: + requests: + cpu: 125m + memory: 64Mi + limits: + cpu: 1000m + memory: 2048Mi + serviceAccountName: pingsource-mt-adapter diff --git a/pkg/reconciler/pingsource/controller.go b/pkg/reconciler/pingsource/controller.go index 7b51f056f78..b7f5ff49f8a 100644 --- a/pkg/reconciler/pingsource/controller.go +++ b/pkg/reconciler/pingsource/controller.go @@ -19,7 +19,6 @@ package pingsource import ( "context" - "github.com/kelseyhightower/envconfig" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" "k8s.io/client-go/tools/cache" @@ -40,13 +39,6 @@ import ( reconcilersource "knative.dev/eventing/pkg/reconciler/source" ) -// envConfig will be used to extract the required environment variables using -// github.com/kelseyhightower/envconfig. If this configuration cannot be extracted, then -// NewController will panic. -type envConfig struct { - Image string `envconfig:"MT_PING_IMAGE" required:"true"` -} - // NewController initializes the controller and is called by the generated code // Registers event handlers to enqueue events func NewController( @@ -55,11 +47,6 @@ func NewController( ) *controller.Impl { logger := logging.FromContext(ctx) - env := &envConfig{} - if err := envconfig.Process("", env); err != nil { - logger.Fatalw("unable to process PingSourceSource's required environment variables: %v", err) - } - // Retrieve leader election config leaderElectionConfig, err := sharedmain.GetLeaderElectionConfig(ctx) if err != nil { @@ -78,13 +65,12 @@ func NewController( pingSourceInformer := pingsourceinformer.Get(ctx) r := &Reconciler{ - kubeClientSet: kubeclient.Get(ctx), - pingLister: pingSourceInformer.Lister(), - deploymentLister: deploymentInformer.Lister(), - leConfig: leConfig, - loggingContext: ctx, - configs: reconcilersource.WatchConfigurations(ctx, component, cmw), - receiveAdapterImage: env.Image, + kubeClientSet: kubeclient.Get(ctx), + pingLister: pingSourceInformer.Lister(), + deploymentLister: deploymentInformer.Lister(), + leConfig: leConfig, + loggingContext: ctx, + configs: reconcilersource.WatchConfigurations(ctx, component, cmw), } impl := pingsourcereconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/pingsource/controller_test.go b/pkg/reconciler/pingsource/controller_test.go index 6b30971477a..a9d3d39cc20 100644 --- a/pkg/reconciler/pingsource/controller_test.go +++ b/pkg/reconciler/pingsource/controller_test.go @@ -17,7 +17,6 @@ limitations under the License. package pingsource import ( - "os" "testing" corev1 "k8s.io/api/core/v1" @@ -35,83 +34,38 @@ import ( ) func TestNew(t *testing.T) { - testCases := map[string]struct { - setEnv bool - }{ - "image not set": {}, - "image set": { - setEnv: true, + ctx, _ := SetupFakeContext(t) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-observability", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-logging", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "zap-logger-config": "test-config", + "loglevel.controller": "info", + "loglevel.webhook": "info", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-tracing", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - if tc.setEnv { - if err := os.Setenv("PING_IMAGE", "anything"); err != nil { - t.Fatalf("Failed to set env var: %v", err) - } - if err := os.Setenv("MT_PING_IMAGE", "anything"); err != nil { - t.Fatalf("Failed to set env var: %v", err) - } - defer func() { - if err := os.Unsetenv("PING_IMAGE"); err != nil { - t.Fatalf("Failed to unset env var: %v", err) - } - if err := os.Unsetenv("MT_PING_IMAGE"); err != nil { - t.Fatalf("Failed to unset env var: %v", err) - } - }() - - if err := os.Setenv("METRICS_DOMAIN", "knative.dev/eventing"); err != nil { - t.Fatalf("Failed to set env var: %v", err) - } - defer func() { - if err := os.Unsetenv("METRICS_DOMAIN"); err != nil { - t.Fatalf("Failed to unset env var: %v", err) - } - }() - } else { - defer func() { - r := recover() - if r == nil { - t.Errorf("Expected NewController to panic, nothing recovered.") - } - }() - } - - ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher( - &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "config-observability", - Namespace: "knative-eventing", - }, - Data: map[string]string{ - "_example": "test-config", - }, - }, &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "config-logging", - Namespace: "knative-eventing", - }, - Data: map[string]string{ - "zap-logger-config": "test-config", - "loglevel.controller": "info", - "loglevel.webhook": "info", - }, - }, &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "config-tracing", - Namespace: "knative-eventing", - }, - Data: map[string]string{ - "_example": "test-config", - }, - }, - )) + )) - if c == nil { - t.Fatal("Expected NewController to return a non-nil value") - } - }) + if c == nil { + t.Fatal("Expected NewController to return a non-nil value") } } diff --git a/pkg/reconciler/pingsource/pingsource.go b/pkg/reconciler/pingsource/pingsource.go index a60b871491a..94a65b22964 100644 --- a/pkg/reconciler/pingsource/pingsource.go +++ b/pkg/reconciler/pingsource/pingsource.go @@ -54,6 +54,7 @@ const ( component = "pingsource" mtcomponent = "pingsource-mt-adapter" mtadapterName = "pingsource-mt-adapter" + containerName = "dispatcher" ) func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event { @@ -64,8 +65,6 @@ func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event { type Reconciler struct { kubeClientSet kubernetes.Interface - receiveAdapterImage string - // listers index properties about resources pingLister listers.PingSourceLister deploymentLister appsv1listers.DeploymentLister @@ -152,13 +151,11 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta } args := resources.Args{ - ServiceAccountName: mtadapterName, - AdapterName: mtadapterName, - Image: r.receiveAdapterImage, - LoggingConfig: loggingConfig, - MetricsConfig: metricsConfig, - LeConfig: r.leConfig, - NoShutdownAfter: mtping.GetNoShutDownAfterValue(), + AdapterName: mtadapterName, + LoggingConfig: loggingConfig, + MetricsConfig: metricsConfig, + LeConfig: r.leConfig, + NoShutdownAfter: mtping.GetNoShutDownAfterValue(), } expected := resources.MakeReceiveAdapter(args) @@ -174,8 +171,9 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta return d, nil } return nil, fmt.Errorf("error getting mt adapter deployment %v", err) - } else if podSpecChanged(d.Spec.Template.Spec, expected.Spec.Template.Spec) { - d.Spec.Template.Spec = expected.Spec.Template.Spec + } else if update, c := needsUpdating(ctx, &d.Spec.Template.Spec, &expected.Spec.Template.Spec); update { + c.Env = expected.Spec.Template.Spec.Containers[0].Env + if d, err = r.kubeClientSet.AppsV1().Deployments(system.Namespace()).Update(d); err != nil { return d, err } @@ -187,7 +185,22 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta return d, nil } -func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { - // We really care about the fields we set and ignore the test. - return !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) +func needsUpdating(ctx context.Context, oldPodSpec *corev1.PodSpec, newPodSpec *corev1.PodSpec) (bool, *corev1.Container) { + // We just care about the environment of the dispatcher container + container := findContainer(oldPodSpec, containerName) + if container == nil { + logging.FromContext(ctx).Errorf("invalid %s deployment: missing the %s container", mtadapterName, containerName) + return false, nil + } + + return !equality.Semantic.DeepEqual(container.Env, newPodSpec.Containers[0].Env), container +} + +func findContainer(podSpec *corev1.PodSpec, name string) *corev1.Container { + for i, container := range podSpec.Containers { + if container.Name == name { + return &podSpec.Containers[i] + } + } + return nil } diff --git a/pkg/reconciler/pingsource/pingsource_test.go b/pkg/reconciler/pingsource/pingsource_test.go index 0f20a253569..ea0500bfcdf 100644 --- a/pkg/reconciler/pingsource/pingsource_test.go +++ b/pkg/reconciler/pingsource/pingsource_test.go @@ -67,16 +67,11 @@ var ( ) const ( - image = "github.com/knative/test/image" - mtimage = "github.com/knative/test/mtimage" - sourceName = "test-ping-source" - sourceUID = "1234" - sourceNameLong = "test-pingserver-source-with-a-very-long-name" - sourceUIDLong = "cafed00d-cafed00d-cafed00d-cafed00d-cafed00d" - testNS = "testnamespace" - testSchedule = "*/2 * * * *" - testData = "data" - crName = "knative-eventing-pingsource-adapter" + sourceName = "test-ping-source" + sourceUID = "1234" + testNS = "testnamespace" + testSchedule = "*/2 * * * *" + testData = "data" sinkName = "testsink" generation = 1 @@ -229,11 +224,10 @@ func TestAllCases(t *testing.T) { table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = addressable.WithDuck(ctx) r := &Reconciler{ - kubeClientSet: fakekubeclient.Get(ctx), - pingLister: listers.GetPingSourceV1beta1Lister(), - deploymentLister: listers.GetDeploymentLister(), - tracker: tracker.New(func(types.NamespacedName) {}, 0), - receiveAdapterImage: mtimage, + kubeClientSet: fakekubeclient.Get(ctx), + pingLister: listers.GetPingSourceV1beta1Lister(), + deploymentLister: listers.GetDeploymentLister(), + tracker: tracker.New(func(types.NamespacedName) {}, 0), } r.sinkResolver = resolver.NewURIResolver(ctx, func(types.NamespacedName) {}) @@ -248,10 +242,8 @@ func TestAllCases(t *testing.T) { func MakeMTAdapter() *appsv1.Deployment { args := resources.Args{ - ServiceAccountName: mtadapterName, - AdapterName: mtadapterName, - Image: mtimage, - NoShutdownAfter: mtping.GetNoShutDownAfterValue(), + AdapterName: mtadapterName, + NoShutdownAfter: mtping.GetNoShutDownAfterValue(), } return resources.MakeReceiveAdapter(args) } diff --git a/pkg/reconciler/pingsource/resources/receive_adapter.go b/pkg/reconciler/pingsource/resources/receive_adapter.go index fa4de10e255..9da9b58b2d1 100644 --- a/pkg/reconciler/pingsource/resources/receive_adapter.go +++ b/pkg/reconciler/pingsource/resources/receive_adapter.go @@ -21,34 +21,22 @@ import ( v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/adapter/mtping" "knative.dev/eventing/pkg/adapter/v2" "knative.dev/pkg/system" ) -var ( - mtlabels = map[string]string{ - "sources.knative.dev/role": "adapter", - "eventing.knative.dev/source": controllerAgentName, - } -) - type Args struct { - ServiceAccountName string - AdapterName string - Image string - MetricsConfig string - LoggingConfig string - LeConfig string - NoShutdownAfter int + AdapterName string + MetricsConfig string + LoggingConfig string + LeConfig string + NoShutdownAfter int } // MakeReceiveAdapter generates the mtping deployment for pingsources func MakeReceiveAdapter(args Args) *v1.Deployment { - replicas := int32(1) - return &v1.Deployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", @@ -59,37 +47,30 @@ func MakeReceiveAdapter(args Args) *v1.Deployment { Name: args.AdapterName, }, Spec: v1.DeploymentSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: mtlabels, - }, Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: mtlabels, - }, Spec: corev1.PodSpec{ - ServiceAccountName: args.ServiceAccountName, Containers: []corev1.Container{ { - Name: "dispatcher", - Image: args.Image, - Env: makeEnv(args), - - // Set low resource requests and limits. - // This should be configurable. - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("125m"), - corev1.ResourceMemory: resource.MustParse("64Mi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1000m"), - corev1.ResourceMemory: resource.MustParse("2048Mi"), + Name: "dispatcher", + Env: []corev1.EnvVar{{ + Name: system.NamespaceEnvKey, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, }, - }, - Ports: []corev1.ContainerPort{{ - Name: "metrics", - ContainerPort: 9090, + }, { + Name: adapter.EnvConfigMetricsConfig, + Value: args.MetricsConfig, + }, { + Name: adapter.EnvConfigLoggingConfig, + Value: args.LoggingConfig, + }, { + Name: adapter.EnvConfigLeaderElectionConfig, + Value: args.LeConfig, + }, { + Name: mtping.EnvNoShutdownAfter, + Value: strconv.Itoa(args.NoShutdownAfter), }}, }, }, @@ -98,29 +79,3 @@ func MakeReceiveAdapter(args Args) *v1.Deployment { }, } } - -func makeEnv(args Args) []corev1.EnvVar { - return []corev1.EnvVar{{ - Name: system.NamespaceEnvKey, - Value: system.Namespace(), - }, { - Name: adapter.EnvConfigNamespace, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, { - Name: adapter.EnvConfigMetricsConfig, - Value: args.MetricsConfig, - }, { - Name: adapter.EnvConfigLoggingConfig, - Value: args.LoggingConfig, - }, { - Name: adapter.EnvConfigLeaderElectionConfig, - Value: args.LeConfig, - }, { - Name: mtping.EnvNoShutdownAfter, - Value: strconv.Itoa(args.NoShutdownAfter), - }} -} diff --git a/pkg/reconciler/pingsource/resources/receive_adapter_test.go b/pkg/reconciler/pingsource/resources/receive_adapter_test.go index 8195953a87e..991a36df465 100644 --- a/pkg/reconciler/pingsource/resources/receive_adapter_test.go +++ b/pkg/reconciler/pingsource/resources/receive_adapter_test.go @@ -22,22 +22,17 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/system" _ "knative.dev/pkg/system/testing" ) func TestMakePingAdapter(t *testing.T) { - replicas := int32(1) - args := Args{ - ServiceAccountName: "test-sa", - AdapterName: "test-name", - Image: "test-image", - MetricsConfig: "metrics", - LoggingConfig: "logging", - NoShutdownAfter: 40, + AdapterName: "test-name", + MetricsConfig: "metrics", + LoggingConfig: "logging", + NoShutdownAfter: 40, } want := &v1.Deployment{ @@ -50,25 +45,13 @@ func TestMakePingAdapter(t *testing.T) { Name: args.AdapterName, }, Spec: v1.DeploymentSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: mtlabels, - }, Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: mtlabels, - }, Spec: corev1.PodSpec{ - ServiceAccountName: args.ServiceAccountName, Containers: []corev1.Container{ { - Name: "dispatcher", - Image: args.Image, + Name: "dispatcher", Env: []corev1.EnvVar{{ - Name: system.NamespaceEnvKey, - Value: system.Namespace(), - }, { - Name: "NAMESPACE", + Name: system.NamespaceEnvKey, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: "metadata.namespace", @@ -87,22 +70,6 @@ func TestMakePingAdapter(t *testing.T) { Name: "K_NO_SHUTDOWN_AFTER", Value: "40", }}, - // Set low resource requests and limits. - // This should be configurable. - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("125m"), - corev1.ResourceMemory: resource.MustParse("64Mi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1000m"), - corev1.ResourceMemory: resource.MustParse("2048Mi"), - }, - }, - Ports: []corev1.ContainerPort{{ - Name: "metrics", - ContainerPort: 9090, - }}, }, }, }, From 0e1c49622e10deb380b28119cf420d1d76a23d92 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Tue, 1 Sep 2020 16:54:16 -0400 Subject: [PATCH 2/4] fixes after review --- .../deployments/pingsource-mt-adapter.yaml | 7 ++ pkg/reconciler/pingsource/pingsource.go | 21 ++---- pkg/reconciler/pingsource/pingsource_test.go | 23 ++++++- .../pingsource/resources/receive_adapter.go | 65 ++++++------------- .../resources/receive_adapter_test.go | 64 ++++++------------ 5 files changed, 76 insertions(+), 104 deletions(-) diff --git a/config/core/deployments/pingsource-mt-adapter.yaml b/config/core/deployments/pingsource-mt-adapter.yaml index 7c4f670065b..b48e3895131 100644 --- a/config/core/deployments/pingsource-mt-adapter.yaml +++ b/config/core/deployments/pingsource-mt-adapter.yaml @@ -41,12 +41,19 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.namespace + + # The values below are being filled by the ping source controller - name: K_METRICS_CONFIG value: '' - name: K_LOGGING_CONFIG value: '' - name: K_LOGGING_CONFIG value: '' + - name: K_LEADER_ELECTION_CONFIG + value: '' + - name: K_NO_SHUTDOWN_AFTER + value: '' + ports: - containerPort: 9090 name: metrics diff --git a/pkg/reconciler/pingsource/pingsource.go b/pkg/reconciler/pingsource/pingsource.go index 94a65b22964..b9c20e57af1 100644 --- a/pkg/reconciler/pingsource/pingsource.go +++ b/pkg/reconciler/pingsource/pingsource.go @@ -48,7 +48,6 @@ import ( const ( // Name of the corev1.Events emitted from the reconciliation process - pingSourceDeploymentCreated = "PingSourceDeploymentCreated" pingSourceDeploymentUpdated = "PingSourceDeploymentUpdated" component = "pingsource" @@ -151,28 +150,22 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta } args := resources.Args{ - AdapterName: mtadapterName, LoggingConfig: loggingConfig, MetricsConfig: metricsConfig, LeConfig: r.leConfig, NoShutdownAfter: mtping.GetNoShutDownAfterValue(), } - expected := resources.MakeReceiveAdapter(args) + expected := resources.MakeReceiveAdapterEnvVar(args) d, err := r.deploymentLister.Deployments(system.Namespace()).Get(mtadapterName) if err != nil { if apierrors.IsNotFound(err) { - d, err := r.kubeClientSet.AppsV1().Deployments(system.Namespace()).Create(expected) - if err != nil { - controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning, pingSourceDeploymentCreated, "Cluster-scoped deployment not created (%v)", err) - return nil, err - } - controller.GetEventRecorder(ctx).Event(source, corev1.EventTypeNormal, pingSourceDeploymentCreated, "Cluster-scoped deployment created") - return d, nil + logging.FromContext(ctx).Errorw("pingsource adapter deployment doesn't exist", zap.Error(err)) + return nil, err } return nil, fmt.Errorf("error getting mt adapter deployment %v", err) - } else if update, c := needsUpdating(ctx, &d.Spec.Template.Spec, &expected.Spec.Template.Spec); update { - c.Env = expected.Spec.Template.Spec.Containers[0].Env + } else if update, c := needsUpdating(ctx, &d.Spec.Template.Spec, expected); update { + c.Env = expected if d, err = r.kubeClientSet.AppsV1().Deployments(system.Namespace()).Update(d); err != nil { return d, err @@ -185,7 +178,7 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta return d, nil } -func needsUpdating(ctx context.Context, oldPodSpec *corev1.PodSpec, newPodSpec *corev1.PodSpec) (bool, *corev1.Container) { +func needsUpdating(ctx context.Context, oldPodSpec *corev1.PodSpec, newEnvVars []corev1.EnvVar) (bool, *corev1.Container) { // We just care about the environment of the dispatcher container container := findContainer(oldPodSpec, containerName) if container == nil { @@ -193,7 +186,7 @@ func needsUpdating(ctx context.Context, oldPodSpec *corev1.PodSpec, newPodSpec * return false, nil } - return !equality.Semantic.DeepEqual(container.Env, newPodSpec.Containers[0].Env), container + return !equality.Semantic.DeepEqual(container.Env, newEnvVars), container } func findContainer(podSpec *corev1.PodSpec, name string) *corev1.Container { diff --git a/pkg/reconciler/pingsource/pingsource_test.go b/pkg/reconciler/pingsource/pingsource_test.go index ea0500bfcdf..0f2509eb141 100644 --- a/pkg/reconciler/pingsource/pingsource_test.go +++ b/pkg/reconciler/pingsource/pingsource_test.go @@ -20,6 +20,9 @@ import ( "context" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/system" + "knative.dev/eventing/pkg/adapter/mtping" appsv1 "k8s.io/api/apps/v1" @@ -242,10 +245,26 @@ func TestAllCases(t *testing.T) { func MakeMTAdapter() *appsv1.Deployment { args := resources.Args{ - AdapterName: mtadapterName, NoShutdownAfter: mtping.GetNoShutDownAfterValue(), } - return resources.MakeReceiveAdapter(args) + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployments", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: "adaptername", + }, + Spec: appsv1.DeploymentSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "dispatcher", + Env: resources.MakeReceiveAdapterEnvVar(args), + }}}}}} + } func makeAvailableMTAdapter() *appsv1.Deployment { diff --git a/pkg/reconciler/pingsource/resources/receive_adapter.go b/pkg/reconciler/pingsource/resources/receive_adapter.go index 9da9b58b2d1..1990ea8ed23 100644 --- a/pkg/reconciler/pingsource/resources/receive_adapter.go +++ b/pkg/reconciler/pingsource/resources/receive_adapter.go @@ -19,63 +19,40 @@ package resources import ( "strconv" - v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/adapter/mtping" "knative.dev/eventing/pkg/adapter/v2" "knative.dev/pkg/system" ) type Args struct { - AdapterName string MetricsConfig string LoggingConfig string LeConfig string NoShutdownAfter int } -// MakeReceiveAdapter generates the mtping deployment for pingsources -func MakeReceiveAdapter(args Args) *v1.Deployment { - return &v1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployments", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: system.Namespace(), - Name: args.AdapterName, - }, - Spec: v1.DeploymentSpec{ - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "dispatcher", - Env: []corev1.EnvVar{{ - Name: system.NamespaceEnvKey, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, { - Name: adapter.EnvConfigMetricsConfig, - Value: args.MetricsConfig, - }, { - Name: adapter.EnvConfigLoggingConfig, - Value: args.LoggingConfig, - }, { - Name: adapter.EnvConfigLeaderElectionConfig, - Value: args.LeConfig, - }, { - Name: mtping.EnvNoShutdownAfter, - Value: strconv.Itoa(args.NoShutdownAfter), - }}, - }, - }, - }, +// MakeReceiveAdapterEnvVar generates the environment variables for the pingsources +func MakeReceiveAdapterEnvVar(args Args) []corev1.EnvVar { + return []corev1.EnvVar{{ + Name: system.NamespaceEnvKey, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", }, }, - } + }, { + Name: adapter.EnvConfigMetricsConfig, + Value: args.MetricsConfig, + }, { + Name: adapter.EnvConfigLoggingConfig, + Value: args.LoggingConfig, + }, { + Name: adapter.EnvConfigLeaderElectionConfig, + Value: args.LeConfig, + }, { + Name: mtping.EnvNoShutdownAfter, + Value: strconv.Itoa(args.NoShutdownAfter), + }} + } diff --git a/pkg/reconciler/pingsource/resources/receive_adapter_test.go b/pkg/reconciler/pingsource/resources/receive_adapter_test.go index 991a36df465..a32112a41b1 100644 --- a/pkg/reconciler/pingsource/resources/receive_adapter_test.go +++ b/pkg/reconciler/pingsource/resources/receive_adapter_test.go @@ -20,64 +20,40 @@ import ( "testing" "github.com/google/go-cmp/cmp" - v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/system" _ "knative.dev/pkg/system/testing" ) func TestMakePingAdapter(t *testing.T) { args := Args{ - AdapterName: "test-name", MetricsConfig: "metrics", LoggingConfig: "logging", NoShutdownAfter: 40, } - want := &v1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployments", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: system.Namespace(), - Name: args.AdapterName, - }, - Spec: v1.DeploymentSpec{ - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "dispatcher", - Env: []corev1.EnvVar{{ - Name: system.NamespaceEnvKey, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, { - Name: "K_METRICS_CONFIG", - Value: "metrics", - }, { - Name: "K_LOGGING_CONFIG", - Value: "logging", - }, { - Name: "K_LEADER_ELECTION_CONFIG", - Value: "", - }, { - Name: "K_NO_SHUTDOWN_AFTER", - Value: "40", - }}, - }, - }, - }, + want := []corev1.EnvVar{{ + Name: system.NamespaceEnvKey, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", }, }, - } - - got := MakeReceiveAdapter(args) + }, { + Name: "K_METRICS_CONFIG", + Value: "metrics", + }, { + Name: "K_LOGGING_CONFIG", + Value: "logging", + }, { + Name: "K_LEADER_ELECTION_CONFIG", + Value: "", + }, { + Name: "K_NO_SHUTDOWN_AFTER", + Value: "40", + }} + + got := MakeReceiveAdapterEnvVar(args) if diff := cmp.Diff(want, got); diff != "" { t.Errorf("unexpected condition (-want, +got) = %v", diff) From 7309f5c028479c9c3e60c6720915d07fbfd49ca8 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 3 Sep 2020 10:27:20 -0400 Subject: [PATCH 3/4] set default replica to 0 --- config/core/deployments/pingsource-mt-adapter.yaml | 3 ++- pkg/reconciler/pingsource/pingsource.go | 14 ++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/config/core/deployments/pingsource-mt-adapter.yaml b/config/core/deployments/pingsource-mt-adapter.yaml index b48e3895131..98222228258 100644 --- a/config/core/deployments/pingsource-mt-adapter.yaml +++ b/config/core/deployments/pingsource-mt-adapter.yaml @@ -20,7 +20,8 @@ metadata: labels: eventing.knative.dev/release: devel spec: - replicas: 1 + # when set to 0 (and only 0) will be set to 1 when the first PingSource is created. + replicas: 0 selector: matchLabels: eventing.knative.dev/source: ping-source-controller diff --git a/pkg/reconciler/pingsource/pingsource.go b/pkg/reconciler/pingsource/pingsource.go index b9c20e57af1..cfd7b00a163 100644 --- a/pkg/reconciler/pingsource/pingsource.go +++ b/pkg/reconciler/pingsource/pingsource.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" + "k8s.io/utils/pointer" appsv1listers "k8s.io/client-go/listers/apps/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -164,13 +165,17 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta return nil, err } return nil, fmt.Errorf("error getting mt adapter deployment %v", err) - } else if update, c := needsUpdating(ctx, &d.Spec.Template.Spec, expected); update { + } else if update, c := needsUpdating(ctx, &d.Spec, expected); update { c.Env = expected + if *d.Spec.Replicas == 0 { + d.Spec.Replicas = pointer.Int32Ptr(1) + } + if d, err = r.kubeClientSet.AppsV1().Deployments(system.Namespace()).Update(d); err != nil { return d, err } - controller.GetEventRecorder(ctx).Event(source, corev1.EventTypeNormal, pingSourceDeploymentUpdated, "Cluster-scoped deployment updated") + controller.GetEventRecorder(ctx).Event(source, corev1.EventTypeNormal, pingSourceDeploymentUpdated, "pingsource adapter deployment updated") return d, nil } else { logging.FromContext(ctx).Debugw("Reusing existing cluster-scoped deployment", zap.Any("deployment", d)) @@ -178,15 +183,16 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta return d, nil } -func needsUpdating(ctx context.Context, oldPodSpec *corev1.PodSpec, newEnvVars []corev1.EnvVar) (bool, *corev1.Container) { +func needsUpdating(ctx context.Context, oldDeploymentSpec *appsv1.DeploymentSpec, newEnvVars []corev1.EnvVar) (bool, *corev1.Container) { // We just care about the environment of the dispatcher container + oldPodSpec := &oldDeploymentSpec.Template.Spec container := findContainer(oldPodSpec, containerName) if container == nil { logging.FromContext(ctx).Errorf("invalid %s deployment: missing the %s container", mtadapterName, containerName) return false, nil } - return !equality.Semantic.DeepEqual(container.Env, newEnvVars), container + return *oldDeploymentSpec.Replicas == 0 || !equality.Semantic.DeepEqual(container.Env, newEnvVars), container } func findContainer(podSpec *corev1.PodSpec, name string) *corev1.Container { From dd9456e57c692bececea4ab8586bdbb88f4b4447 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 3 Sep 2020 10:35:45 -0400 Subject: [PATCH 4/4] fix ut --- pkg/reconciler/pingsource/pingsource.go | 8 +++- pkg/reconciler/pingsource/pingsource_test.go | 50 +------------------- 2 files changed, 8 insertions(+), 50 deletions(-) diff --git a/pkg/reconciler/pingsource/pingsource.go b/pkg/reconciler/pingsource/pingsource.go index cfd7b00a163..40e07251ba4 100644 --- a/pkg/reconciler/pingsource/pingsource.go +++ b/pkg/reconciler/pingsource/pingsource.go @@ -168,7 +168,7 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta } else if update, c := needsUpdating(ctx, &d.Spec, expected); update { c.Env = expected - if *d.Spec.Replicas == 0 { + if zero(d.Spec.Replicas) { d.Spec.Replicas = pointer.Int32Ptr(1) } @@ -192,7 +192,7 @@ func needsUpdating(ctx context.Context, oldDeploymentSpec *appsv1.DeploymentSpec return false, nil } - return *oldDeploymentSpec.Replicas == 0 || !equality.Semantic.DeepEqual(container.Env, newEnvVars), container + return zero(oldDeploymentSpec.Replicas) || !equality.Semantic.DeepEqual(container.Env, newEnvVars), container } func findContainer(podSpec *corev1.PodSpec, name string) *corev1.Container { @@ -203,3 +203,7 @@ func findContainer(podSpec *corev1.PodSpec, name string) *corev1.Container { } return nil } + +func zero(i *int32) bool { + return i != nil && *i == 0 +} diff --git a/pkg/reconciler/pingsource/pingsource_test.go b/pkg/reconciler/pingsource/pingsource_test.go index 0f2509eb141..6844b3dead2 100644 --- a/pkg/reconciler/pingsource/pingsource_test.go +++ b/pkg/reconciler/pingsource/pingsource_test.go @@ -174,52 +174,6 @@ func TestAllCases(t *testing.T) { WithPingSourceV1B1StatusObservedGeneration(generation), ), }}, - }, { - Name: "valid with cluster scope annotation, create deployment", - SkipNamespaceValidation: true, - Objects: []runtime.Object{ - NewPingSourceV1Beta1(sourceName, testNS, - WithPingSourceV1B1Spec(sourcesv1beta1.PingSourceSpec{ - Schedule: testSchedule, - JsonData: testData, - SourceSpec: duckv1.SourceSpec{ - Sink: sinkDest, - }, - }), - WithPingSourceV1B1UID(sourceUID), - WithPingSourceV1B1ObjectMetaGeneration(generation), - ), - rtv1beta1.NewChannel(sinkName, testNS, - rtv1beta1.WithInitChannelConditions, - rtv1beta1.WithChannelAddress(sinkDNS), - ), - }, - Key: testNS + "/" + sourceName, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "PingSourceDeploymentCreated", `Cluster-scoped deployment created`), - }, - WantCreates: []runtime.Object{ - MakeMTAdapter(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewPingSourceV1Beta1(sourceName, testNS, - WithPingSourceV1B1Spec(sourcesv1beta1.PingSourceSpec{ - Schedule: testSchedule, - JsonData: testData, - SourceSpec: duckv1.SourceSpec{ - Sink: sinkDest, - }, - }), - WithPingSourceV1B1UID(sourceUID), - WithPingSourceV1B1ObjectMetaGeneration(generation), - // Status Update: - WithPingSourceV1B1NotDeployed(mtadapterName), - WithInitPingSourceV1B1Conditions, - WithPingSourceV1B1CloudEventAttributes, - WithPingSourceV1B1Sink(sinkURI), - WithPingSourceV1B1StatusObservedGeneration(generation), - ), - }}, }, } @@ -254,14 +208,14 @@ func MakeMTAdapter() *appsv1.Deployment { }, ObjectMeta: metav1.ObjectMeta{ Namespace: system.Namespace(), - Name: "adaptername", + Name: mtadapterName, }, Spec: appsv1.DeploymentSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ { - Name: "dispatcher", + Name: containerName, Env: resources.MakeReceiveAdapterEnvVar(args), }}}}}}