Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Reconcilers delete old Topics and PullSubscriptions #1066

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
af878a7
Copy the PubSub PullSubscription and Topic controllers.
Harwayne May 1, 2020
b330382
Prefix PullSubscription and Topic functions with PubSub.
Harwayne May 2, 2020
b3eec6e
Listers too.
Harwayne May 2, 2020
1fb95fd
Add the testing methods.
Harwayne May 2, 2020
34066a8
Merge branch 'master' into 905-new-reconcilers
Harwayne May 4, 2020
e969716
Add to webhook.
Harwayne May 4, 2020
ddbccef
Merge branch 'master' into 905-new-reconcilers
Harwayne May 6, 2020
061bcad
Merge branch 'master' into 905-new-reconcilers
Harwayne May 6, 2020
e8c4a54
Add back the reconciler testing helpers, accidentally removed in a pr…
Harwayne May 6, 2020
db5f6c5
Merge branch 'master' into 905-new-reconcilers
Harwayne May 8, 2020
153c166
Merge branch 'master' into 905-new-reconcilers
Harwayne May 8, 2020
d47b8b6
Switch to the intevents API group.
Harwayne May 8, 2020
6138a61
Move generic pubsub reconciler.
Harwayne May 8, 2020
2885d49
Switch PubSubBase to intevents.
Harwayne May 8, 2020
9344191
Switch to the intevents base reconciler.
Harwayne May 8, 2020
ecba4d4
Unit tests.
Harwayne May 8, 2020
76f2fbd
Missed entries.
Harwayne May 8, 2020
330a06e
Readd the pubsub generic reconciler, used only by components in pkg/r…
Harwayne May 8, 2020
0c1ef9f
Merge branch 'master' into 905-new-reconcilers
Harwayne May 8, 2020
66ac06b
Merge branch 'master' into 905-move-to-new-crds
Harwayne May 8, 2020
1c85941
Merge branch 'master' into 905-new-reconcilers
Harwayne May 11, 2020
f237cbf
Copy over the generic pubsub reconciler.
Harwayne May 11, 2020
77ac113
Merge branch '905-new-reconcilers' into 905-move-to-new-crds
Harwayne May 11, 2020
f5c2088
Merge branch 'master' into 905-move-to-new-crds
Harwayne May 12, 2020
0bba6cd
Remove unused listers.
Harwayne May 12, 2020
ff9e50f
Add the new CRD definitions.
Harwayne May 12, 2020
ef1def0
Add to controller.
Harwayne May 12, 2020
8df578f
Switch tests.
Harwayne May 12, 2020
9324a1f
Merge branch 'master' into 905-move-to-new-crds
Harwayne May 12, 2020
df2738a
Extract reoncileTopic.
Harwayne May 12, 2020
347ed29
Delete old COs when the new CO is ready=true.
Harwayne May 12, 2020
e4c1926
Topics set NoDelete before deletion.
Harwayne May 13, 2020
9b43dee
Merge branch 'master' into 905-reconcilers-delete-old-crds
Harwayne May 15, 2020
0953312
Unit tests.
Harwayne May 16, 2020
1f14fa4
Merge branch 'master' into 905-reconcilers-delete-old-crds
Harwayne May 18, 2020
80fdaa2
Merge branch 'master' into 905-reconcilers-delete-old-crds
Harwayne May 20, 2020
18f86ad
Merge branch 'master' into 905-reconcilers-delete-old-crds
Harwayne May 20, 2020
fa20ff2
Merge branch 'master' into 905-reconcilers-delete-old-crds
Harwayne May 21, 2020
31d5d40
PR comments.
Harwayne May 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/duck/v1alpha1/identifiable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package v1alpha1

import (
"k8s.io/apimachinery/pkg/runtime"
"knative.dev/pkg/apis"
"knative.dev/pkg/kmeta"

duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1"
)

type Identifiable interface {
// runtime.Object can be removed once the old Topic and PullSubscription are removed.
runtime.Object
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this one needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event recorder requires a runtime.Object. In the deleteOldPubSub*CO methods, I write an event if the old CO is not owned by the pubsubable.


kmeta.OwnerRefable
// IdentitySpec returns the IdentitySpec portion of the Spec.
IdentitySpec() *duckv1alpha1.IdentitySpec
Expand Down
150 changes: 126 additions & 24 deletions pkg/reconciler/intevents/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1"
inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1"
pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1"
clientset "github.com/google/knative-gcp/pkg/client/clientset/versioned"
duck "github.com/google/knative-gcp/pkg/duck/v1alpha1"
"github.com/google/knative-gcp/pkg/reconciler"
Expand Down Expand Up @@ -62,39 +63,56 @@ type PubSubBase struct {
// Also sets the following fields in the pubsubable.Status upon success
// TopicID, ProjectID, and SinkURI
func (psb *PubSubBase) ReconcilePubSub(ctx context.Context, pubsubable duck.PubSubable, topic, resourceGroup string) (*inteventsv1alpha1.Topic, *inteventsv1alpha1.PullSubscription, error) {
t, err := psb.reconcileTopic(ctx, pubsubable, topic)
if err != nil {
return t, nil, err
}

ps, err := psb.ReconcilePullSubscription(ctx, pubsubable, topic, resourceGroup, false)
if err != nil {
return t, ps, err
}
return t, ps, nil
}

func (psb *PubSubBase) reconcileTopic(ctx context.Context, pubsubable duck.PubSubable, topic string) (*inteventsv1alpha1.Topic, pkgreconciler.Event) {
if pubsubable == nil {
return nil, nil, fmt.Errorf("nil pubsubable passed in")
return nil, fmt.Errorf("nil pubsubable passed in")
}
namespace := pubsubable.GetObjectMeta().GetNamespace()
name := pubsubable.GetObjectMeta().GetName()
spec := pubsubable.PubSubSpec()
annotations := pubsubable.GetObjectMeta().GetAnnotations()
status := pubsubable.PubSubStatus()

name := pubsubable.GetObjectMeta().GetName()
args := &resources.TopicArgs{
Namespace: namespace,
Namespace: pubsubable.GetObjectMeta().GetNamespace(),
Name: name,
Spec: spec,
Spec: pubsubable.PubSubSpec(),
Owner: pubsubable,
Topic: topic,
Labels: resources.GetLabels(psb.receiveAdapterName, name),
Annotations: annotations,
Annotations: pubsubable.GetObjectMeta().GetAnnotations(),
}
newTopic := resources.MakeTopic(args)

topics := psb.pubsubClient.InternalV1alpha1().Topics(namespace)
t, err := topics.Get(name, v1.GetOptions{})
// The old and new Topics use the same, deterministic names. So delete the old one before
// creating the new one. They cannot both be Ready=true at the same time, so by deleting the old
// Topic, we allow the new Topic to become ready.
err := psb.deleteOldPubSubTopic(ctx, pubsubable, newTopic)
if err != nil {
if !apierrs.IsNotFound(err) {
logging.FromContext(ctx).Desugar().Error("Failed to get Topics", zap.Error(err))
return nil, nil, fmt.Errorf("failed to get Topics: %w", err)
}
logging.FromContext(ctx).Desugar().Info("Unable to delete old Topic", zap.Error(err))
return nil, err
}

topics := psb.pubsubClient.InternalV1alpha1().Topics(newTopic.Namespace)
t, err := topics.Get(newTopic.Name, v1.GetOptions{})
if apierrs.IsNotFound(err) {
logging.FromContext(ctx).Desugar().Debug("Creating Topic", zap.Any("topic", newTopic))
t, err = topics.Create(newTopic)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", newTopic), zap.Error(err))
return nil, nil, fmt.Errorf("failed to create Topic: %w", err)
return nil, fmt.Errorf("failed to create Topic: %w", err)
}
} else if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err))
return nil, fmt.Errorf("failed to get Topic: %w", err)
// Check whether the specs differ and update the Topic if so.
} else if !equality.Semantic.DeepDerivative(newTopic.Spec, t.Spec) {
// Don't modify the informers copy.
Expand All @@ -104,21 +122,17 @@ func (psb *PubSubBase) ReconcilePubSub(ctx context.Context, pubsubable duck.PubS
t, err = topics.Update(desired)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to update Topic", zap.Any("topic", t), zap.Error(err))
return nil, nil, fmt.Errorf("failed to update Topic: %w", err)
return nil, fmt.Errorf("failed to update Topic: %w", err)
}
}

status := pubsubable.PubSubStatus()
cs := pubsubable.ConditionSet()

if err := propagateTopicStatus(t, status, cs, topic); err != nil {
return t, nil, err
return t, err
}

ps, err := psb.ReconcilePullSubscription(ctx, pubsubable, topic, resourceGroup, false)
if err != nil {
return t, ps, err
}
return t, ps, nil
return t, nil
}

func (psb *PubSubBase) ReconcilePullSubscription(ctx context.Context, pubsubable duck.PubSubable, topic, resourceGroup string, isPushCompatible bool) (*inteventsv1alpha1.PullSubscription, pkgreconciler.Event) {
Expand Down Expand Up @@ -181,9 +195,97 @@ func (psb *PubSubBase) ReconcilePullSubscription(ctx context.Context, pubsubable
}

status.SinkURI = ps.Status.SinkURI

// The old and new PullSubscriptions can co-exist without any problems. So to bias in favor of
// double event delivery over dropped events, don't delete the old one until the new one is
// ready.
if ps.Status.IsReady() {
err = psb.deleteOldPubSubPullSubscription(ctx, pubsubable, ps)
if err != nil {
return ps, err
}
}

return ps, nil
}

func (psb *PubSubBase) deleteOldPubSubTopic(_ context.Context, pubsubable duck.PubSubable, t *inteventsv1alpha1.Topic) pkgreconciler.Event {
// TODO This will be deleted at the same time as the old pubsub.cloud.google.com CRDs. That is
// expected to happen after 0.15, before 0.16.
oldT, err := psb.pubsubClient.PubsubV1alpha1().Topics(t.Namespace).Get(t.Name, v1.GetOptions{})
if apierrs.IsNotFound(err) {
// It doesn't exist, so there is nothing to delete.
return nil
} else if err != nil {
return pkgreconciler.NewEvent(corev1.EventTypeWarning, "OldTopicGetFailed", "unable to get old Topic in the `pubsub.events.cloud.google.com` API group: %w", err)
}
if !v1.IsControlledBy(oldT, pubsubable.GetObjectMeta()) {
// If this pubsubable doesn't own it, then just ignore it. Generate an event in case users
// are interested, but do not stop reconciliation of pubsubable, nor give it a Ready=false
// status.
psb.Recorder.Eventf(pubsubable,
corev1.EventTypeWarning,
"OldTopicNotControlled",
"old Topic '%s/%s' in the `pubsub.events.cloud.google.com` API group is not controlled by this pubsubable, so won't be deleted. Actual owners: %v",
oldT.Namespace, oldT.Name, oldT.OwnerReferences)
return nil
}

// First, to make sure the Topic is not deleted in GCP, update the Topic with a new deletion
// policy.
switch pp := oldT.Spec.PropagationPolicy; pp {
case pubsubv1alpha1.TopicPolicyCreateDelete:
c := oldT.DeepCopy()
c.Spec.PropagationPolicy = pubsubv1alpha1.TopicPolicyCreateNoDelete
oldT, err = psb.pubsubClient.PubsubV1alpha1().Topics(oldT.Namespace).Update(c)
if err != nil {
return pkgreconciler.NewEvent(corev1.EventTypeWarning, "OldTopicUpdateFailed", "unable to update propagation policy on old Topic: %w", err)
}
case pubsubv1alpha1.TopicPolicyCreateNoDelete:
// Already marked for non-deletion.
break
case pubsubv1alpha1.TopicPolicyNoCreateNoDelete:
// Already marked for non-deletion.
break
default:
return pkgreconciler.NewEvent(corev1.EventTypeWarning, "OldTopicUnknownPropagationPolicy", "unknown propagation policy on old Topic: %v", pp)
}

err = psb.pubsubClient.PubsubV1alpha1().Topics(oldT.Namespace).Delete(oldT.Name, nil)
if err != nil {
return pkgreconciler.NewEvent(corev1.EventTypeWarning, "OldTopicDeletionFailed", "unable to delete old Topic in the `pubsub.events.cloud.google.com` API group: %w", err)
}
return nil
}

func (psb *PubSubBase) deleteOldPubSubPullSubscription(_ context.Context, pubsubable duck.PubSubable, ps *inteventsv1alpha1.PullSubscription) pkgreconciler.Event {
// TODO This will be deleted at the same time as the old pubsub.cloud.google.com CRDs. That is
// expected to happen after 0.15, before 0.16.
oldPS, err := psb.pubsubClient.PubsubV1alpha1().PullSubscriptions(ps.Namespace).Get(ps.Name, v1.GetOptions{})
if apierrs.IsNotFound(err) {
// It doesn't exist, so there is nothing to delete.
return nil
} else if err != nil {
return pkgreconciler.NewEvent(corev1.EventTypeWarning, "OldPullSubscriptionGetFailed", "unable to get old PullSubscription in the `pubsub.events.cloud.google.com` API group: %w", err)
}
if !v1.IsControlledBy(oldPS, pubsubable.GetObjectMeta()) {
// If this pubsubable doesn't own it, then just ignore it. Generate an event in case users
// are interested, but do not stop reconciliation of pubsubable, nor give it a Ready=false
// status.
psb.Recorder.Eventf(pubsubable,
corev1.EventTypeWarning,
"oldPullSubscriptionNotControlled",
"old PullSubscription '%s/%s' in the `pubsub.events.cloud.google.com` API group is not controlled by this pubsubable, so won't be deleted. Actual owners: %v",
oldPS.Namespace, oldPS.Name, oldPS.OwnerReferences)
return nil
}
err = psb.pubsubClient.PubsubV1alpha1().PullSubscriptions(oldPS.Namespace).Delete(oldPS.Name, nil)
if err != nil {
return pkgreconciler.NewEvent(corev1.EventTypeWarning, "OldPullSubscriptionDeletionFailed", "unable to delete old PullSubscription in the `pubsub.events.cloud.google.com` API group: %w", err)
}
return nil
}

func propagatePullSubscriptionStatus(ps *inteventsv1alpha1.PullSubscription, status *duckv1alpha1.PubSubStatus, cs *apis.ConditionSet) error {
pc := ps.Status.GetTopLevelCondition()
if pc == nil {
Expand Down
Loading