Skip to content

Commit

Permalink
use tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
lionelvillard committed Mar 11, 2020
1 parent 84aab70 commit 32668b6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
23 changes: 15 additions & 8 deletions pkg/reconciler/pingsource/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package controller
import (
"context"

appsv1 "k8s.io/api/apps/v1"

"knative.dev/pkg/tracker"

"github.com/kelseyhightower/envconfig"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/reconciler"
Expand Down Expand Up @@ -68,12 +72,13 @@ func NewController(
pingLister: pingSourceInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
eventTypeLister: eventTypeInformer.Lister(),
loggingContext: ctx,

loggingContext: ctx,
}

env := &envConfig{}
if err := envconfig.Process("", env); err != nil {
r.Logger.Panicf("unable to process CronJobSource's required environment variables: %v", err)
r.Logger.Panicf("unable to process PingSourceSource's required environment variables: %v", err)
}
r.receiveAdapterImage = env.Image
r.jobRunnerImage = env.JobRunnerImage
Expand All @@ -91,15 +96,17 @@ func NewController(
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

// Watch for the global deployment and propagate
// any changes to all PingSources relying on it.
gr := func(obj interface{}) {
impl.GlobalResync(pingSourceInformer.Informer())
}
// Tracker is used to notify us that the jobrunner Deployment has changed so that
// we can reconcile PingSources that depends on it
r.tracker = tracker.New(impl.EnqueueKey, controller.GetTrackerLease(ctx))

deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(system.Namespace(), jobRunnerName),
Handler: controller.HandleAll(gr),
Handler: controller.HandleAll(
controller.EnsureTypeMeta(
r.tracker.OnChanged,
appsv1.SchemeGroupVersion.WithKind("Deployment"),
)),
})

eventTypeInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/pingsource/controller/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"
"knative.dev/pkg/tracker"

"knative.dev/eventing/pkg/apis/eventing"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
Expand Down Expand Up @@ -82,6 +83,9 @@ type Reconciler struct {
deploymentLister appsv1listers.DeploymentLister
eventTypeLister eventinglisters.EventTypeLister

// tracking jobrunner deployment changes
tracker tracker.Interface

loggingContext context.Context
sinkResolver *resolver.URIResolver
loggingConfig *pkgLogging.Config
Expand Down Expand Up @@ -138,6 +142,14 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.PingSou
return err
}
source.Status.PropagateDeploymentAvailability(d)

// Tell tracker to reconcile this PingSource whenever the deployment changes
r.tracker.TrackReference(tracker.Reference{
APIVersion: d.APIVersion,
Kind: d.Kind,
Namespace: d.Namespace,
Name: d.Name,
}, source)
} else {
ra, err := r.createReceiveAdapter(ctx, source, sinkURI)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/pingsource/controller/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"knative.dev/pkg/tracker"

"knative.dev/pkg/system"

"knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/pingsource"
Expand Down Expand Up @@ -602,6 +604,7 @@ func TestAllCases(t *testing.T) {
pingLister: listers.GetPingSourceLister(),
deploymentLister: listers.GetDeploymentLister(),
eventTypeLister: listers.GetEventTypeLister(),
tracker: tracker.New(func(types.NamespacedName) {}, 0),
receiveAdapterImage: image,
}
r.sinkResolver = resolver.NewURIResolver(ctx, func(types.NamespacedName) {})
Expand Down

0 comments on commit 32668b6

Please sign in to comment.