Skip to content

Commit

Permalink
Merge 764edf1 into abd6f98
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaojizhuang authored Mar 8, 2021
2 parents abd6f98 + 764edf1 commit d319e5a
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 72 deletions.
2 changes: 1 addition & 1 deletion pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (a *mtpingAdapter) Update(ctx context.Context, source *v1beta2.PingSource)
a.entryidMu.Unlock()
}

func (a *mtpingAdapter) Remove(ctx context.Context, source *v1beta2.PingSource) {
func (a *mtpingAdapter) Remove(source *v1beta2.PingSource) {
key := fmt.Sprintf("%s/%s", source.Namespace, source.Name)

a.entryidMu.RLock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/adapter/mtping/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestUpdateRemoveAdapter(t *testing.T) {
t.Error(`Expected cron entries to contain "test-ns/test-name"`)
}

adapter.Remove(ctx, &v1beta2.PingSource{
adapter.Remove(&v1beta2.PingSource{
ObjectMeta: metav1.ObjectMeta{
Name: "test-name",
Namespace: "test-ns",
Expand Down
17 changes: 11 additions & 6 deletions pkg/adapter/mtping/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package mtping
import (
"context"

"knative.dev/pkg/reconciler"

"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/apis/sources/v1beta2"
pingsourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1beta2/pingsource"
pingsourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1beta2/pingsource"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/reconciler"
)

// TODO: code generation
Expand All @@ -38,7 +38,7 @@ type MTAdapter interface {
Update(ctx context.Context, source *v1beta2.PingSource)

// Remove is called when the source has been deleted.
Remove(ctx context.Context, source *v1beta2.PingSource)
Remove(source *v1beta2.PingSource)

// RemoveAll is called when the adapter stopped leading
RemoveAll(ctx context.Context)
Expand All @@ -64,6 +64,11 @@ func NewController(ctx context.Context, adapter adapter.Adapter) *controller.Imp
})

logging.FromContext(ctx).Info("Setting up event handlers")
pingsourceinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
pingsourceinformer.Get(ctx).Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: r.deleteFunc,
})
return impl
}
10 changes: 9 additions & 1 deletion pkg/adapter/mtping/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package mtping

import (
"context"
"fmt"

"testing"

. "knative.dev/pkg/reconciler/testing"
Expand All @@ -29,14 +31,20 @@ import (
"knative.dev/eventing/pkg/apis/sources/v1beta2"
)

var removePingsource map[string]bool

type testAdapter struct {
adapter.Adapter
}

func (testAdapter) Update(context.Context, *v1beta2.PingSource) {
}

func (testAdapter) Remove(context.Context, *v1beta2.PingSource) {
func (testAdapter) Remove(p *v1beta2.PingSource) {
if removePingsource == nil {
removePingsource = make(map[string]bool)
}
removePingsource[fmt.Sprintf("%s/%s", p.Namespace, p.Name)] = true
}

func (testAdapter) RemoveAll(context.Context) {
Expand Down
25 changes: 15 additions & 10 deletions pkg/adapter/mtping/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"context"
"fmt"

"knative.dev/pkg/reconciler"

"knative.dev/eventing/pkg/apis/sources/v1beta2"
pingsourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1beta2/pingsource"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/reconciler"
)

// TODO: code generation
Expand All @@ -36,9 +36,6 @@ type Reconciler struct {
// Check that our Reconciler implements ReconcileKind.
var _ pingsourcereconciler.Interface = (*Reconciler)(nil)

// Check that our Reconciler implements FinalizeKind.
var _ pingsourcereconciler.Finalizer = (*Reconciler)(nil)

func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1beta2.PingSource) reconciler.Event {
if !source.Status.IsReady() {
return fmt.Errorf("warning: PingSource is not ready")
Expand All @@ -50,9 +47,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1beta2.PingSour
return nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, source *v1beta2.PingSource) reconciler.Event {
// Update the adapter state
r.mtadapter.Remove(ctx, source)

return nil
func (r *Reconciler) deleteFunc(obj interface{}) {
if obj == nil {
return
}
acc, err := kmeta.DeletionHandlingAccessor(obj)
if err != nil {
return
}
pingSource, ok := acc.(*v1beta2.PingSource)
if !ok || pingSource == nil {
return
}
r.mtadapter.Remove(pingSource)
}
70 changes: 17 additions & 53 deletions pkg/adapter/mtping/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package mtping

import (
"context"

"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientgotesting "k8s.io/client-go/testing"
"knative.dev/eventing/pkg/apis/sources/v1beta2"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/reconciler/sources/v1beta2/pingsource"
Expand All @@ -41,14 +39,13 @@ import (
)

const (
testNS = "test-namespace"
pingSourceName = "test-pingsource"
testSchedule = "*/2 * * * *"
testContentType = cloudevents.TextPlain
testData = "data"
testDataBase64 = "ZGF0YQ=="
sinkName = "mysink"
defaultFinalizerName = "pingsources.sources.knative.dev"
testNS = "test-namespace"
pingSourceName = "test-pingsource"
testSchedule = "*/2 * * * *"
testContentType = cloudevents.TextPlain
testData = "data"
testDataBase64 = "ZGF0YQ=="
sinkName = "mysink"
)

var (
Expand Down Expand Up @@ -91,12 +88,6 @@ func TestAllCases(t *testing.T) {
rttestingv1beta2.WithPingSourceCloudEventAttributes,
),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "%s" finalizers`, pingSourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, pingSourceName, defaultFinalizerName),
},
WantErr: false,
}, {
Name: "valid schedule without contentType, data and dataBase64",
Expand All @@ -116,12 +107,6 @@ func TestAllCases(t *testing.T) {
rttestingv1beta2.WithPingSourceCloudEventAttributes,
),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "%s" finalizers`, pingSourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, pingSourceName, defaultFinalizerName),
},
WantErr: false,
}, {
Name: "valid schedule with dataBase64",
Expand All @@ -143,12 +128,6 @@ func TestAllCases(t *testing.T) {
rttestingv1beta2.WithPingSourceCloudEventAttributes,
),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "%s" finalizers`, pingSourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, pingSourceName, defaultFinalizerName),
},
WantErr: false,
}, {
Name: "valid schedule, with finalizer",
Expand All @@ -168,7 +147,6 @@ func TestAllCases(t *testing.T) {
rttestingv1beta2.WithPingSourceDeployed,
rttestingv1beta2.WithPingSourceSink(sinkURI),
rttestingv1beta2.WithPingSourceCloudEventAttributes,
rttestingv1beta2.WithPingSourceFinalizers(defaultFinalizerName),
),
},
WantErr: false,
Expand All @@ -190,20 +168,13 @@ func TestAllCases(t *testing.T) {
rttestingv1beta2.WithPingSourceDeployed,
rttestingv1beta2.WithPingSourceSink(sinkURI),
rttestingv1beta2.WithPingSourceCloudEventAttributes,
rttestingv1beta2.WithPingSourceFinalizers(defaultFinalizerName),
rttestingv1beta2.WithPingSourceDeleted,
),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "%s" finalizers`, pingSourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, pingSourceName, ""),
},
WantErr: false,
}, {
Name: "valid schedule, deleted without finalizer",
Key: pingsourceKey,
Key: "a/a",
Objects: []runtime.Object{
rttestingv1beta2.NewPingSource(pingSourceName, testNS,
rttestingv1beta2.WithPingSourceSpec(v1beta2.PingSourceSpec{
Expand Down Expand Up @@ -237,20 +208,13 @@ func TestAllCases(t *testing.T) {

}

func patchFinalizers(namespace, name string, finalizers string) clientgotesting.PatchActionImpl {
fstr := ""
if finalizers != "" {
fstr = `"` + finalizers + `"`
}
return clientgotesting.PatchActionImpl{
ActionImpl: clientgotesting.ActionImpl{
Namespace: namespace,
Verb: "patch",
Resource: schema.GroupVersionResource{Group: "sources.knative.dev", Version: "v1beta1", Resource: "pingsources"},
Subresource: "",
},
Name: name,
PatchType: "application/merge-patch+json",
Patch: []byte(`{"metadata":{"finalizers":[` + fstr + `],"resourceVersion":""}}`),
func TestReconciler_deleteFunc(t *testing.T) {
pingsourceKey := testNS + "/" + pingSourceName
adapter := &testAdapter{}
p := rttestingv1beta2.NewPingSource(pingSourceName, testNS)
r := &Reconciler{mtadapter: adapter}
r.deleteFunc(p)
if _, ok := removePingsource[pingsourceKey]; !ok {
t.Errorf("Got error when call deleteFunc")
}
}

0 comments on commit d319e5a

Please sign in to comment.