Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Add labels for topics and pullsubscription (#542)
Browse files Browse the repository at this point in the history
* add labels to args for creating topics and pullsubscription

* fix tests and add labels.go for reconciler/pubsub/resources

* fix tests

* address review comments
  • Loading branch information
Nicolas Lopez authored Feb 11, 2020
1 parent ee782d0 commit cda1f5c
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 98 deletions.
6 changes: 4 additions & 2 deletions pkg/reconciler/events/auditlogs/auditlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func TestAllCases(t *testing.T) {
PropagationPolicy: "CreateDelete",
}),
WithTopicLabels(map[string]string{
"receive-adapter": receiveAdapterName,
"receive-adapter": receiveAdapterName,
"events.cloud.google.com/source-name": sourceName,
}),
WithTopicOwnerReferences([]metav1.OwnerReference{sourceOwnerRef(sourceName, sourceUID)}),
),
Expand Down Expand Up @@ -371,7 +372,8 @@ func TestAllCases(t *testing.T) {
}),
WithPullSubscriptionSink(sinkGVK, sinkName),
WithPullSubscriptionLabels(map[string]string{
"receive-adapter": receiveAdapterName,
"receive-adapter": receiveAdapterName,
"events.cloud.google.com/source-name": sourceName,
}),
WithPullSubscriptionAnnotations(map[string]string{
"metrics-resource-group": resourceGroup,
Expand Down
16 changes: 8 additions & 8 deletions pkg/reconciler/events/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ func (r *Reconciler) reconcilePullSubscription(ctx context.Context, source *v1al
return nil, fmt.Errorf("failed to get PullSubscription: %w", err)
}
args := &resources.PullSubscriptionArgs{
Namespace: source.Namespace,
Name: source.Name,
Spec: &source.Spec.PubSubSpec,
Owner: source,
Topic: source.Spec.Topic,
ReceiveAdapter: r.receiveAdapterName,
ResourceGroup: resourceGroup,
Mode: pubsubv1alpha1.ModePushCompatible,
Namespace: source.Namespace,
Name: source.Name,
Spec: &source.Spec.PubSubSpec,
Owner: source,
Topic: source.Spec.Topic,
ResourceGroup: resourceGroup,
Mode: pubsubv1alpha1.ModePushCompatible,
Labels: resources.GetLabels(r.receiveAdapterName, source.Name),
}
newPS := resources.MakePullSubscription(args)
logging.FromContext(ctx).Desugar().Debug("Creating PullSubscription", zap.Any("ps", newPS))
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/events/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func TestAllCases(t *testing.T) {
WithPullSubscriptionSink(sinkGVK, sinkName),
WithPullSubscriptionMode(pubsubv1alpha1.ModePushCompatible),
WithPullSubscriptionLabels(map[string]string{
"receive-adapter": receiveAdapterName,
"receive-adapter": receiveAdapterName,
"events.cloud.google.com/source-name": pubsubName,
}),
WithPullSubscriptionAnnotations(map[string]string{
"metrics-resource-group": resourceGroup,
Expand Down
7 changes: 4 additions & 3 deletions pkg/reconciler/events/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ func TestAllCases(t *testing.T) {
PropagationPolicy: "CreateDelete",
}),
WithTopicLabels(map[string]string{
"receive-adapter": receiveAdapterName,
"receive-adapter": receiveAdapterName,
"events.cloud.google.com/source-name": schedulerName,
}),
WithTopicOwnerReferences([]metav1.OwnerReference{ownerRef()}),
),
Expand Down Expand Up @@ -434,8 +435,8 @@ func TestAllCases(t *testing.T) {
}),
WithPullSubscriptionSink(sinkGVK, sinkName),
WithPullSubscriptionLabels(map[string]string{
"receive-adapter": receiveAdapterName,
}),
"receive-adapter": receiveAdapterName,
"events.cloud.google.com/source-name": schedulerName}),
WithPullSubscriptionAnnotations(map[string]string{
"metrics-resource-group": resourceGroup,
}),
Expand Down
6 changes: 4 additions & 2 deletions pkg/reconciler/events/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ func TestAllCases(t *testing.T) {
PropagationPolicy: "CreateDelete",
}),
WithTopicLabels(map[string]string{
"receive-adapter": receiveAdapterName,
"receive-adapter": receiveAdapterName,
"events.cloud.google.com/source-name": storageName,
}),
WithTopicOwnerReferences([]metav1.OwnerReference{ownerRef()}),
),
Expand Down Expand Up @@ -416,7 +417,8 @@ func TestAllCases(t *testing.T) {
}),
WithPullSubscriptionSink(sinkGVK, sinkName),
WithPullSubscriptionLabels(map[string]string{
"receive-adapter": receiveAdapterName,
"receive-adapter": receiveAdapterName,
"events.cloud.google.com/source-name": storageName,
}),
WithPullSubscriptionAnnotations(map[string]string{
"metrics-resource-group": resourceGroup,
Expand Down
28 changes: 18 additions & 10 deletions pkg/reconciler/pubsub/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
)
Expand Down Expand Up @@ -69,7 +69,15 @@ func (psb *PubSubBase) ReconcilePubSub(ctx context.Context, pubsubable duck.PubS
logging.FromContext(ctx).Desugar().Error("Failed to get Topics", zap.Error(err))
return nil, nil, fmt.Errorf("failed to get Topics: %w", err)
}
newTopic := resources.MakeTopic(namespace, name, spec, pubsubable, topic, psb.receiveAdapterName)
args := &resources.TopicArgs{
Namespace: namespace,
Name: name,
Spec: spec,
Owner: pubsubable,
Topic: topic,
Labels: resources.GetLabels(psb.receiveAdapterName, name),
}
newTopic := resources.MakeTopic(args)
t, err = topics.Create(newTopic)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", newTopic), zap.Error(err))
Expand All @@ -92,14 +100,14 @@ func (psb *PubSubBase) ReconcilePubSub(ctx context.Context, pubsubable duck.PubS
return t, nil, fmt.Errorf("failed to get Pullsubscription: %w", err)
}
args := &resources.PullSubscriptionArgs{
Namespace: namespace,
Name: name,
Spec: spec,
Owner: pubsubable,
Topic: topic,
ReceiveAdapter: psb.receiveAdapterName,
AdapterType: psb.adapterType,
ResourceGroup: resourceGroup,
Namespace: namespace,
Name: name,
Spec: spec,
Owner: pubsubable,
Topic: topic,
AdapterType: psb.adapterType,
ResourceGroup: resourceGroup,
Labels: resources.GetLabels(psb.receiveAdapterName, name),
}
newPS := resources.MakePullSubscription(args)
ps, err = pullSubscriptions.Create(newPS)
Expand Down
Loading

0 comments on commit cda1f5c

Please sign in to comment.