Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Reconcilers for PullSubscription and Topic in intevents #1013

Merged
merged 14 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"github.com/google/knative-gcp/pkg/apis/events"
eventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
eventsv1beta1 "github.com/google/knative-gcp/pkg/apis/events/v1beta1"
"github.com/google/knative-gcp/pkg/apis/intevents"
inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1"
inteventsv1beta1 "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1"
"github.com/google/knative-gcp/pkg/apis/messaging"
messagingv1alpha1 "github.com/google/knative-gcp/pkg/apis/messaging/v1alpha1"
messagingv1beta1 "github.com/google/knative-gcp/pkg/apis/messaging/v1beta1"
Expand Down Expand Up @@ -58,6 +61,10 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
eventsv1alpha1.SchemeGroupVersion.WithKind("CloudAuditLogsSource"): &eventsv1alpha1.CloudAuditLogsSource{},
eventsv1alpha1.SchemeGroupVersion.WithKind("CloudBuildSource"): &eventsv1alpha1.CloudBuildSource{},

// For group internal.events.cloud.google.com.
inteventsv1alpha1.SchemeGroupVersion.WithKind("PullSubscription"): &inteventsv1alpha1.PullSubscription{},
inteventsv1alpha1.SchemeGroupVersion.WithKind("Topic"): &inteventsv1alpha1.Topic{},

// For group pubsub.cloud.google.com.
pubsubv1alpha1.SchemeGroupVersion.WithKind("PullSubscription"): &pubsubv1alpha1.PullSubscription{},
pubsubv1alpha1.SchemeGroupVersion.WithKind("Topic"): &pubsubv1alpha1.Topic{},
Expand Down Expand Up @@ -138,6 +145,8 @@ func NewConversionController(ctx context.Context, _ configmap.Watcher) *controll
messagingv1beta1_ = messagingv1beta1.SchemeGroupVersion.Version
pubsubv1alpha1_ = pubsubv1alpha1.SchemeGroupVersion.Version
pubsubv1beta1_ = pubsubv1beta1.SchemeGroupVersion.Version
inteventsv1alpha1_ = inteventsv1alpha1.SchemeGroupVersion.Version
inteventsv1beta1_ = inteventsv1beta1.SchemeGroupVersion.Version
)

return conversion.NewConversionController(ctx,
Expand Down Expand Up @@ -179,6 +188,23 @@ func NewConversionController(ctx context.Context, _ configmap.Watcher) *controll
eventsv1beta1_: &eventsv1beta1.CloudStorageSource{},
},
},
// intevents
inteventsv1alpha1.Kind("PullSubscription"): {
DefinitionName: intevents.PullSubscriptionsResource.String(),
HubVersion: inteventsv1alpha1_,
Zygotes: map[string]conversion.ConvertibleObject{
inteventsv1alpha1_: &inteventsv1alpha1.PullSubscription{},
inteventsv1beta1_: &inteventsv1beta1.PullSubscription{},
},
},
inteventsv1alpha1.Kind("Topic"): {
DefinitionName: intevents.TopicsResource.String(),
HubVersion: inteventsv1alpha1_,
Zygotes: map[string]conversion.ConvertibleObject{
inteventsv1alpha1_: &inteventsv1alpha1.Topic{},
inteventsv1beta1_: &inteventsv1beta1.Topic{},
},
},
// messaging
messagingv1alpha1.Kind("Channel"): {
DefinitionName: messaging.ChannelsResource.String(),
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/intevents/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ limitations under the License.
// resources.
package intevents

import "k8s.io/apimachinery/pkg/runtime/schema"

const (
GroupName = "internal.events.cloud.google.com"
)

var (
// PullSubscriptionsResource represents a PullSubscription.
PullSubscriptionsResource = schema.GroupResource{
Group: GroupName,
Resource: "pullsubscriptions",
}
// TopicsResource represents a Topic.
TopicsResource = schema.GroupResource{
Group: GroupName,
Resource: "topics",
}
)
43 changes: 43 additions & 0 deletions pkg/reconciler/intevents/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package intevents

import (
"context"

"knative.dev/pkg/configmap"

pubsubClient "github.com/google/knative-gcp/pkg/client/injection/client"
"github.com/google/knative-gcp/pkg/reconciler"
)

func NewPubSubBase(ctx context.Context, controllerAgentName, receiveAdapterName string, cmw configmap.Watcher) *PubSubBase {
return &PubSubBase{
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
pubsubClient: pubsubClient.Get(ctx),
receiveAdapterName: receiveAdapterName,
}
}

func NewPubSubBaseWithAdapter(ctx context.Context, controllerAgentName, receiveAdapterName string, adapterType string, cmw configmap.Watcher) *PubSubBase {
return &PubSubBase{
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
pubsubClient: pubsubClient.Get(ctx),
receiveAdapterName: receiveAdapterName,
adapterType: adapterType,
}
}
36 changes: 36 additions & 0 deletions pkg/reconciler/intevents/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package intevents

import (
"testing"

"knative.dev/pkg/configmap"
logtesting "knative.dev/pkg/logging/testing"
. "knative.dev/pkg/reconciler/testing"
)

func TestNew(t *testing.T) {
defer logtesting.ClearAll()
ctx, _ := SetupFakeContext(t)

c := NewPubSubBase(ctx, "test-controller", "test-ra", configmap.NewStaticWatcher())

if c == nil {
t.Fatal("Expected NewPubSubBase to return a non-nil value")
}
}
18 changes: 18 additions & 0 deletions pkg/reconciler/intevents/pullsubscription/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package pullsubscription implements the Pub/Sub PullSubscription controllers.
package pullsubscription
143 changes: 143 additions & 0 deletions pkg/reconciler/intevents/pullsubscription/keda/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package keda

import (
"context"

"go.uber.org/zap"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/cache"

duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1"
"github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1"
"github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1alpha1/resource"
pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription"
pullsubscriptionreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1alpha1/pullsubscription"
gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub"
"github.com/google/knative-gcp/pkg/reconciler"
"github.com/google/knative-gcp/pkg/reconciler/identity"
"github.com/google/knative-gcp/pkg/reconciler/identity/iam"
"github.com/google/knative-gcp/pkg/reconciler/intevents"
psreconciler "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription"
"github.com/kelseyhightower/envconfig"

eventingduck "knative.dev/eventing/pkg/duck"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
serviceaccountinformers "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
tracingconfig "knative.dev/pkg/tracing/config"
)

const (
// reconcilerName is the name of the reconciler
reconcilerName = "KedaPullSubscriptions"

// controllerAgentName is the string used by this controller to identify
// itself when creating events.
controllerAgentName = "cloud-run-events-pubsub-keda-pullsubscription-controller"

resourceGroup = "pullsubscriptions.internal.events.cloud.google.com"
)

type envConfig struct {
// ReceiveAdapter is the receive adapters image. Required.
ReceiveAdapter string `envconfig:"PUBSUB_RA_IMAGE" required:"true"`
}

// NewController initializes the controller and is called by the generated code
// Registers event handlers to enqueue events
func NewController(
ctx context.Context,
cmw configmap.Watcher,
) *controller.Impl {
return newControllerWithIAMPolicyManager(
ctx,
cmw,
iam.DefaultIAMPolicyManager())
}

func newControllerWithIAMPolicyManager(
ctx context.Context,
cmw configmap.Watcher,
ipm iam.IAMPolicyManager,
) *controller.Impl {
deploymentInformer := deploymentinformer.Get(ctx)
pullSubscriptionInformer := pullsubscriptioninformers.Get(ctx)
serviceAccountInformer := serviceaccountinformers.Get(ctx)

logger := logging.FromContext(ctx).Named(controllerAgentName).Desugar()

var env envConfig
if err := envconfig.Process("", &env); err != nil {
logger.Fatal("Failed to process env var", zap.Error(err))
}

pubsubBase := &intevents.PubSubBase{
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
}

r := &Reconciler{
Base: &psreconciler.Base{
PubSubBase: pubsubBase,
Identity: identity.NewIdentity(ctx, ipm),
DeploymentLister: deploymentInformer.Lister(),
PullSubscriptionLister: pullSubscriptionInformer.Lister(),
ReceiveAdapterImage: env.ReceiveAdapter,
CreateClientFn: gpubsub.NewClient,
ControllerAgentName: controllerAgentName,
ResourceGroup: resourceGroup,
},
}

impl := pullsubscriptionreconciler.NewImpl(ctx, r)

pubsubBase.Logger.Info("Setting up event handlers")
onlyKedaScaler := pkgreconciler.AnnotationFilterFunc(duckv1alpha1.AutoscalingClassAnnotation, duckv1alpha1.KEDA, false)

pullSubscriptionHandler := cache.FilteringResourceEventHandler{
FilterFunc: onlyKedaScaler,
Handler: controller.HandleAll(impl.Enqueue),
}
pullSubscriptionInformer.Informer().AddEventHandlerWithResyncPeriod(pullSubscriptionHandler, reconciler.DefaultResyncPeriod)

deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: onlyKedaScaler,
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

serviceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterGroupVersionKind(v1alpha1.SchemeGroupVersion.WithKind("Pullsubscription")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

r.UriResolver = resolver.NewURIResolver(ctx, impl.EnqueueKey)
r.ReconcileDataPlaneFn = r.ReconcileScaledObject
r.scaledObjectTracker = eventingduck.NewListableTracker(ctx, resource.Get, impl.EnqueueKey, controller.GetTrackerLease(ctx))
r.discoveryFn = discovery.ServerSupportsVersion

cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap)
cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap)
cmw.Watch(tracingconfig.ConfigName, r.UpdateFromTracingConfigMap)

return impl
}
Loading