Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistent cache store backed by ConfigMap. Refactor mtping #3451

Merged
merged 12 commits into from
Jul 17, 2020
19 changes: 2 additions & 17 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@ limitations under the License.
package main

import (
"context"
"fmt"
"knative.dev/pkg/signals"

"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"
)

const (
Expand All @@ -34,16 +29,6 @@ const (

func main() {
ctx := signals.NewContext()
cfg := sharedmain.ParseAndGetConfigOrDie()
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

// Start the injection clients and informers.
go func(ctx context.Context) {
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
panic(fmt.Sprintf("Failed to start informers - %s", err))
}
<-ctx.Done()
}(ctx)

ctx = adapter.WithInjectorEnabled(ctx)
adapter.MainWithContext(ctx, component, apiserver.NewEnvConfig, apiserver.NewAdapter)
}
25 changes: 7 additions & 18 deletions cmd/mtping/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,19 @@ limitations under the License.
package main

import (
"context"
"fmt"

"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"

"knative.dev/eventing/pkg/adapter/mtping"
"knative.dev/eventing/pkg/adapter/v2"
)

const (
component = "pingsource-mt-adapter"
lionelvillard marked this conversation as resolved.
Show resolved Hide resolved
)

func main() {
ctx := signals.NewContext()
cfg := sharedmain.ParseAndGetConfigOrDie()
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

// Start the injection clients and informers.
go func(ctx context.Context) {
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
panic(fmt.Sprintf("Failed to start informers - %s", err))
}
<-ctx.Done()
}(ctx)

adapter.MainWithContext(ctx, "pingsource-mt-adapter", mtping.NewEnvConfig, mtping.NewAdapter)
ctx = adapter.WithConfigMapWatcherEnabled(ctx)
ctx = adapter.WithInjectorEnabled(ctx)
adapter.MainWithContext(ctx, component, mtping.NewEnvConfig, mtping.NewAdapter)
}
1 change: 1 addition & 0 deletions config/core/roles/controller-clusterroles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rules:
- "endpoints"
- "events"
- "serviceaccounts"
- "pods"
verbs: &everything
- "get"
- "list"
Expand Down
16 changes: 0 additions & 16 deletions config/core/roles/pingsource-mt-adapter-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,6 @@ rules:
- "get"
- "list"
- "watch"
- apiGroups:
- sources.knative.dev
resources:
- pingsources
- pingsources/status
verbs:
- get
- list
- watch
- patch
- apiGroups:
- sources.knative.dev
resources:
- pingsources/finalizers
verbs:
- "patch"
- apiGroups:
- ""
resources:
Expand Down
24 changes: 12 additions & 12 deletions pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
"knative.dev/pkg/controller"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/adapter/v2"
Expand All @@ -30,32 +30,32 @@ import (
// mtpingAdapter implements the PingSource mt adapter to sinks
type mtpingAdapter struct {
logger *zap.SugaredLogger
client cloudevents.Client
runner *cronJobsRunner
}

func NewEnvConfig() adapter.EnvConfigAccessor {
return &adapter.EnvConfig{}
}

func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
runner := NewCronJobsRunner(ceClient, kubeclient.Get(ctx), logging.FromContext(ctx))

cmw := adapter.ConfigMapWatcherFromContext(ctx)
cmw.Watch("config-pingsource-mt-adapter", runner.updateFromConfigMap)

return &mtpingAdapter{
logger: logging.FromContext(ctx),
client: ceClient,
runner: runner,
}
}

// Start implements adapter.Adapter
func (a *mtpingAdapter) Start(ctx context.Context) error {
runner := NewCronJobsRunner(a.client, a.logger)

ctrl := NewController(ctx, runner)

a.logger.Info("Starting controllers...")
go controller.StartAll(ctx, ctrl)

a.logger.Info("Starting job runner...")
runner.Start(ctx.Done())
if err := a.runner.Start(ctx.Done()); err != nil {
return err
}

a.logger.Infof("controller and runner stopped")
a.logger.Infof("runner stopped")
return nil
}
11 changes: 9 additions & 2 deletions pkg/adapter/mtping/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,24 @@ import (
"testing"
"time"

adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
_ "knative.dev/pkg/client/injection/kube/client/fake"
rectesting "knative.dev/pkg/reconciler/testing"

pkgadapter "knative.dev/eventing/pkg/adapter/v2"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
)

func TestStartStopAdapter(t *testing.T) {
ctx, _ := rectesting.SetupFakeContext(t)
ctx, cancel := context.WithCancel(ctx)
cmw := pkgadapter.SetupConfigMapWatchOrDie(ctx, "component", "test-ns")
ctx = pkgadapter.WithConfigMapWatcher(ctx, cmw)

envCfg := NewEnvConfig()

ce := adaptertest.NewTestClient()
adapter := NewAdapter(ctx, envCfg, ce)

ctx, cancel := context.WithCancel(ctx)
done := make(chan bool)
go func(ctx context.Context) {
err := adapter.Start(ctx)
Expand Down
74 changes: 74 additions & 0 deletions pkg/adapter/mtping/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mtping

import (
corev1 "k8s.io/api/core/v1"

"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/sources/v1alpha2"
)

type PingConfig struct {
corev1.ObjectReference `json:",inline"`

// Schedule is the cronjob schedule. Defaults to `* * * * *`.
Schedule string `json:"schedule"`

// JsonData is json encoded data used as the body of the event posted to
// the sink. Default is empty. If set, datacontenttype will also be set
// to "application/json".
// +optional
JsonData string `json:"jsonData,omitempty"`

// Extensions specify what attribute are added or overridden on the
// outbound event. Each `Extensions` key-value pair are set on the event as
// an attribute extension independently.
// +optional
Extensions map[string]string `json:"extensions,omitempty"`

// SinkURI is the current active sink URI that has been configured for the
// Source.
SinkURI string `json:"sinkUri,omitempty"`
}

type PingConfigs map[string]PingConfig

// Project creates a PingConfig for the given source
func Project(i interface{}) interface{} {
obj := i.(*v1alpha2.PingSource)

if scope, ok := obj.Annotations[eventing.ScopeAnnotationKey]; ok && scope != eventing.ScopeCluster {
return nil
}

cfg := &PingConfig{
ObjectReference: corev1.ObjectReference{
Name: obj.Name,
Namespace: obj.Namespace,
UID: obj.UID,
ResourceVersion: obj.ResourceVersion,
},
Schedule: obj.Spec.Schedule,
JsonData: obj.Spec.JsonData,
SinkURI: obj.Status.SinkURI.String(),
}
if obj.Spec.CloudEventOverrides != nil {
cfg.Extensions = obj.Spec.CloudEventOverrides.Extensions
}
return cfg
}
107 changes: 107 additions & 0 deletions pkg/adapter/mtping/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mtping

import (
"testing"

"knative.dev/pkg/apis"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

sourcesv1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2"
)

func TestProject(t *testing.T) {
testCases := map[string]struct {
source sourcesv1alpha2.PingSource
expected PingConfig
}{
"TestAddRunRemoveSchedule": {
source: sourcesv1alpha2.PingSource{
ObjectMeta: metav1.ObjectMeta{
Name: "test-name",
Namespace: "test-ns"},

Spec: sourcesv1alpha2.PingSourceSpec{
SourceSpec: duckv1.SourceSpec{
CloudEventOverrides: nil,
},
Schedule: "* * * * ?",
JsonData: "some data",
},
Status: sourcesv1alpha2.PingSourceStatus{
duckv1.SourceStatus{
SinkURI: &apis.URL{
Host: "asink",
},
},
},
},
expected: PingConfig{
ObjectReference: corev1.ObjectReference{
Name: "test-name",
Namespace: "test-ns",
},
Schedule: "* * * * ?",
JsonData: "some data",
SinkURI: "//asink",
}},
"TestAddRunRemoveScheduleWithExtensionOverride": {
source: sourcesv1alpha2.PingSource{
ObjectMeta: metav1.ObjectMeta{
Name: "test-name",
Namespace: "test-ns"},
Spec: sourcesv1alpha2.PingSourceSpec{
SourceSpec: duckv1.SourceSpec{
Sink: duckv1.Destination{},
CloudEventOverrides: &duckv1.CloudEventOverrides{
Extensions: map[string]string{"1": "one", "2": "two"},
},
},
Schedule: "* * * * ?",
JsonData: "some data",
},
Status: sourcesv1alpha2.PingSourceStatus{
duckv1.SourceStatus{
SinkURI: &apis.URL{Host: "anothersink"},
},
},
},
expected: PingConfig{
ObjectReference: corev1.ObjectReference{
Name: "test-name",
Namespace: "test-ns",
},
Schedule: "* * * * ?",
JsonData: "some data",
Extensions: map[string]string{"1": "one", "2": "two"},
SinkURI: "//anothersink",
}},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
got := Project(&tc.source)
if diff := cmp.Diff(&tc.expected, got); diff != "" {
t.Errorf("unexpected projection (-want, +got) = %v", diff)
}
})
}
}
Loading