Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager pingsource adapter creation #3987

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/500-pingsource-mt-adapter.yaml
3 changes: 0 additions & 3 deletions config/core/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ spec:
value: config-observability
- name: METRICS_DOMAIN
value: knative.dev/eventing
# PingSource
- name: MT_PING_IMAGE
value: ko://knative.dev/eventing/cmd/mtping
# APIServerSource
- name: APISERVER_RA_IMAGE
value: ko://knative.dev/eventing/cmd/apiserver_receive_adapter
Expand Down
69 changes: 69 additions & 0 deletions config/core/deployments/pingsource-mt-adapter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: pingsource-mt-adapter
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
spec:
# when set to 0 (and only 0) will be set to 1 when the first PingSource is created.
replicas: 0
selector:
matchLabels:
eventing.knative.dev/source: ping-source-controller
sources.knative.dev/role: adapter
template:
metadata:
labels:
eventing.knative.dev/source: ping-source-controller
sources.knative.dev/role: adapter
eventing.knative.dev/release: devel
spec:
containers:
- name: dispatcher
image: ko://knative.dev/eventing/cmd/mtping
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace

# The values below are being filled by the ping source controller
- name: K_METRICS_CONFIG
lionelvillard marked this conversation as resolved.
Show resolved Hide resolved
value: ''
- name: K_LOGGING_CONFIG
value: ''
- name: K_LOGGING_CONFIG
value: ''
- name: K_LEADER_ELECTION_CONFIG
value: ''
- name: K_NO_SHUTDOWN_AFTER
value: ''
lionelvillard marked this conversation as resolved.
Show resolved Hide resolved

ports:
- containerPort: 9090
name: metrics
protocol: TCP
resources:
requests:
cpu: 125m
memory: 64Mi
limits:
cpu: 1000m
memory: 2048Mi
serviceAccountName: pingsource-mt-adapter
26 changes: 6 additions & 20 deletions pkg/reconciler/pingsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package pingsource
import (
"context"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -40,13 +39,6 @@ import (
reconcilersource "knative.dev/eventing/pkg/reconciler/source"
)

// envConfig will be used to extract the required environment variables using
// github.com/kelseyhightower/envconfig. If this configuration cannot be extracted, then
// NewController will panic.
type envConfig struct {
Image string `envconfig:"MT_PING_IMAGE" required:"true"`
}

// NewController initializes the controller and is called by the generated code
// Registers event handlers to enqueue events
func NewController(
Expand All @@ -55,11 +47,6 @@ func NewController(
) *controller.Impl {
logger := logging.FromContext(ctx)

env := &envConfig{}
if err := envconfig.Process("", env); err != nil {
logger.Fatalw("unable to process PingSourceSource's required environment variables: %v", err)
}

// Retrieve leader election config
leaderElectionConfig, err := sharedmain.GetLeaderElectionConfig(ctx)
if err != nil {
Expand All @@ -78,13 +65,12 @@ func NewController(
pingSourceInformer := pingsourceinformer.Get(ctx)

r := &Reconciler{
kubeClientSet: kubeclient.Get(ctx),
pingLister: pingSourceInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
leConfig: leConfig,
loggingContext: ctx,
configs: reconcilersource.WatchConfigurations(ctx, component, cmw),
receiveAdapterImage: env.Image,
kubeClientSet: kubeclient.Get(ctx),
pingLister: pingSourceInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
leConfig: leConfig,
loggingContext: ctx,
configs: reconcilersource.WatchConfigurations(ctx, component, cmw),
}

impl := pingsourcereconciler.NewImpl(ctx, r)
Expand Down
108 changes: 31 additions & 77 deletions pkg/reconciler/pingsource/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package pingsource

import (
"os"
"testing"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,83 +34,38 @@ import (
)

func TestNew(t *testing.T) {
testCases := map[string]struct {
setEnv bool
}{
"image not set": {},
"image set": {
setEnv: true,
ctx, _ := SetupFakeContext(t)
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-observability",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-logging",
Namespace: "knative-eventing",
},
Data: map[string]string{
"zap-logger-config": "test-config",
"loglevel.controller": "info",
"loglevel.webhook": "info",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-tracing",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
if tc.setEnv {
if err := os.Setenv("PING_IMAGE", "anything"); err != nil {
t.Fatalf("Failed to set env var: %v", err)
}
if err := os.Setenv("MT_PING_IMAGE", "anything"); err != nil {
t.Fatalf("Failed to set env var: %v", err)
}
defer func() {
if err := os.Unsetenv("PING_IMAGE"); err != nil {
t.Fatalf("Failed to unset env var: %v", err)
}
if err := os.Unsetenv("MT_PING_IMAGE"); err != nil {
t.Fatalf("Failed to unset env var: %v", err)
}
}()

if err := os.Setenv("METRICS_DOMAIN", "knative.dev/eventing"); err != nil {
t.Fatalf("Failed to set env var: %v", err)
}
defer func() {
if err := os.Unsetenv("METRICS_DOMAIN"); err != nil {
t.Fatalf("Failed to unset env var: %v", err)
}
}()
} else {
defer func() {
r := recover()
if r == nil {
t.Errorf("Expected NewController to panic, nothing recovered.")
}
}()
}

ctx, _ := SetupFakeContext(t)
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-observability",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-logging",
Namespace: "knative-eventing",
},
Data: map[string]string{
"zap-logger-config": "test-config",
"loglevel.controller": "info",
"loglevel.webhook": "info",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-tracing",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
},
))
))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
}
})
if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
}
}
64 changes: 40 additions & 24 deletions pkg/reconciler/pingsource/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"

appsv1listers "k8s.io/client-go/listers/apps/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -48,12 +49,12 @@ import (

const (
// Name of the corev1.Events emitted from the reconciliation process
pingSourceDeploymentCreated = "PingSourceDeploymentCreated"
pingSourceDeploymentUpdated = "PingSourceDeploymentUpdated"

component = "pingsource"
mtcomponent = "pingsource-mt-adapter"
mtadapterName = "pingsource-mt-adapter"
containerName = "dispatcher"
)

func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event {
Expand All @@ -64,8 +65,6 @@ func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event {
type Reconciler struct {
kubeClientSet kubernetes.Interface

receiveAdapterImage string

// listers index properties about resources
pingLister listers.PingSourceLister
deploymentLister appsv1listers.DeploymentLister
Expand Down Expand Up @@ -152,42 +151,59 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta
}

args := resources.Args{
ServiceAccountName: mtadapterName,
AdapterName: mtadapterName,
Image: r.receiveAdapterImage,
LoggingConfig: loggingConfig,
MetricsConfig: metricsConfig,
LeConfig: r.leConfig,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
LoggingConfig: loggingConfig,
MetricsConfig: metricsConfig,
LeConfig: r.leConfig,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
}
expected := resources.MakeReceiveAdapter(args)
expected := resources.MakeReceiveAdapterEnvVar(args)

d, err := r.deploymentLister.Deployments(system.Namespace()).Get(mtadapterName)
if err != nil {
if apierrors.IsNotFound(err) {
d, err := r.kubeClientSet.AppsV1().Deployments(system.Namespace()).Create(expected)
if err != nil {
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeWarning, pingSourceDeploymentCreated, "Cluster-scoped deployment not created (%v)", err)
return nil, err
}
controller.GetEventRecorder(ctx).Event(source, corev1.EventTypeNormal, pingSourceDeploymentCreated, "Cluster-scoped deployment created")
return d, nil
logging.FromContext(ctx).Errorw("pingsource adapter deployment doesn't exist", zap.Error(err))
return nil, err
}
return nil, fmt.Errorf("error getting mt adapter deployment %v", err)
} else if podSpecChanged(d.Spec.Template.Spec, expected.Spec.Template.Spec) {
d.Spec.Template.Spec = expected.Spec.Template.Spec
} else if update, c := needsUpdating(ctx, &d.Spec, expected); update {
c.Env = expected

if zero(d.Spec.Replicas) {
d.Spec.Replicas = pointer.Int32Ptr(1)
}

if d, err = r.kubeClientSet.AppsV1().Deployments(system.Namespace()).Update(d); err != nil {
return d, err
}
controller.GetEventRecorder(ctx).Event(source, corev1.EventTypeNormal, pingSourceDeploymentUpdated, "Cluster-scoped deployment updated")
controller.GetEventRecorder(ctx).Event(source, corev1.EventTypeNormal, pingSourceDeploymentUpdated, "pingsource adapter deployment updated")
return d, nil
} else {
logging.FromContext(ctx).Debugw("Reusing existing cluster-scoped deployment", zap.Any("deployment", d))
}
return d, nil
}

func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool {
// We really care about the fields we set and ignore the test.
return !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec)
func needsUpdating(ctx context.Context, oldDeploymentSpec *appsv1.DeploymentSpec, newEnvVars []corev1.EnvVar) (bool, *corev1.Container) {
// We just care about the environment of the dispatcher container
oldPodSpec := &oldDeploymentSpec.Template.Spec
container := findContainer(oldPodSpec, containerName)
if container == nil {
logging.FromContext(ctx).Errorf("invalid %s deployment: missing the %s container", mtadapterName, containerName)
return false, nil
}

return zero(oldDeploymentSpec.Replicas) || !equality.Semantic.DeepEqual(container.Env, newEnvVars), container
}

func findContainer(podSpec *corev1.PodSpec, name string) *corev1.Container {
for i, container := range podSpec.Containers {
if container.Name == name {
return &podSpec.Containers[i]
}
}
return nil
}

func zero(i *int32) bool {
return i != nil && *i == 0
}
Loading