Skip to content

Commit

Permalink
changes from review and fix configmap label
Browse files Browse the repository at this point in the history
  • Loading branch information
lionelvillard committed Jul 15, 2020
1 parent cb1c7c4 commit 2249f46
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cmd/mtping/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ func main() {
ctx := signals.NewContext()
ctx = adapter.WithConfigMapWatcherEnabled(ctx)
ctx = adapter.WithInjectorEnabled(ctx)
adapter.MainWithContext(ctx, "pingsource-mt-adapter", mtping.NewEnvConfig, mtping.NewAdapter)
adapter.MainWithContext(ctx, component, mtping.NewEnvConfig, mtping.NewAdapter)
}
4 changes: 3 additions & 1 deletion pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloud
// Start implements adapter.Adapter
func (a *mtpingAdapter) Start(ctx context.Context) error {
a.logger.Info("Starting job runner...")
a.runner.Start(ctx.Done())
if err := a.runner.Start(ctx.Done()); err != nil {
return err
}

a.logger.Infof("runner stopped")
return nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func message(body string) interface{} {
}

func (a *cronJobsRunner) updateFromConfigMap(cm *corev1.ConfigMap) {
a.Logger.Info("synchronizing configmap")
data, ok := cm.Data[cache.ResourcesKey]
if !ok {
// Shouldn't happened.
Expand Down Expand Up @@ -165,14 +166,17 @@ func (a *cronJobsRunner) updateFromConfigMap(cm *corev1.ConfigMap) {
// Is the schedule already cached?
if cfgid, ok := a.entryids[key]; ok {
if !equality.Semantic.DeepEqual(cfgid.config, cfg) {
a.Logger.Infof("updating schedule", zap.Any("key", key))
// Recreate cronjob
a.RemoveSchedule(cfgid.entryID)
cfgid.entryID = a.AddSchedule(cfg)
cfgid.config = &cfg

} else {
// cron jon exists and correctly configure. noop.
}
} else {
a.Logger.Infof("adding schedule", zap.Any("key", key))
// Create cronjob
a.entryids[key] = entryIdConfig{
entryID: a.AddSchedule(cfg),
Expand All @@ -185,6 +189,7 @@ func (a *cronJobsRunner) updateFromConfigMap(cm *corev1.ConfigMap) {

for key := range keys {
if cfgid, ok := a.entryids[key]; ok {
a.Logger.Infof("deleting schedule", zap.Any("key", key))
a.RemoveSchedule(cfgid.entryID)
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/reconciler/pingsource/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ const (
component = "pingsource"
mtcomponent = "pingsource-mt-adapter"
mtadapterName = "pingsource-mt-adapter"
mtconfigmapName = "config-pingsource-mt-adapter"
stadapterClusterRoleName = "knative-eventing-pingsource-adapter"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/cache/persisted_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (p *persistedStore) load() (*corev1.ConfigMap, error) {
ObjectMeta: metav1.ObjectMeta{
Name: p.name,
Namespace: p.namespace,
Annotations: map[string]string{
Labels: map[string]string{
ComponentLabelKey: p.component,
},
},
Expand Down
20 changes: 14 additions & 6 deletions pkg/utils/cache/persisted_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func TestPersistedStore(t *testing.T) {
cmName = "test-cm-name"
)
cs := fake.NewSimpleClientset()
created := make(chan struct{})
created := make(chan runtime.Object)
updated := make(chan runtime.Object)
done := make(chan bool)
cs.PrependReactor("create", "configmaps",
func(action ktesting.Action) (bool, runtime.Object, error) {
close(created)
created <- action.(ktesting.CreateAction).GetObject()
return false, nil, nil
},
)
Expand Down Expand Up @@ -93,8 +93,12 @@ func TestPersistedStore(t *testing.T) {
informer.Add(kr)

select {
case <-created:
case obj := <-created:
// We expect the configmap to be created.
cm := obj.(*corev1.ConfigMap)
if value, ok := cm.Labels[ComponentLabelKey]; !ok || value != "my-component" {
t.Fatalf("Missing %s label. Got %v", ComponentLabelKey, cm)
}
case <-time.After(1 * time.Second):
t.Fatal("Timed out waiting for configmap creation.")
}
Expand Down Expand Up @@ -158,12 +162,12 @@ func TestPersistedStoreInterrupted(t *testing.T) {
cmName = "test-cm-name"
)
cs := fake.NewSimpleClientset()
created := make(chan struct{})
created := make(chan runtime.Object)
updated := make(chan runtime.Object)
cs.PrependReactor("create", "configmaps",
func(action ktesting.Action) (bool, runtime.Object, error) {
time.Sleep(1 * time.Second)
close(created)
created <- action.(ktesting.CreateAction).GetObject()
return false, nil, nil
},
)
Expand Down Expand Up @@ -196,8 +200,12 @@ func TestPersistedStoreInterrupted(t *testing.T) {
informer.Add(kr2) // interrupt

select {
case <-created:
case obj := <-created:
// We expect the configmap to be created.
cm := obj.(*corev1.ConfigMap)
if value, ok := cm.Labels[ComponentLabelKey]; !ok || value != "my-component" {
t.Fatalf("Missing %s label. Got %v", ComponentLabelKey, cm)
}
case <-time.After(2 * time.Second):
t.Fatal("Timed out waiting for configmap creation.")
}
Expand Down

0 comments on commit 2249f46

Please sign in to comment.