Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Removing/Updating old publisher, ra, and pubsub resources (#1380)
Browse files Browse the repository at this point in the history
* removing old publisher, ra, and pubsub resources

* removing topic and creating it again for sources when the spec.topic changed.
That can only happen on updates to 0.16

* updates for channel

* nits

* deleting and creating PS again. That will take care of removing the old
RA and the old Pub/Sub subscription

* deleting and creating PS again. That will take care of removing the old
RA and the old Pub/Sub subscription

* no need to remove ScaledObject

* some review comments

* getting error deleting the sink. Not deleting it and things can keep working

* updating scheduler target pubsub topic

* fixing storage

* nits
  • Loading branch information
nachocano authored Jul 7, 2020
1 parent e16c2ee commit 80454f1
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 38 deletions.
7 changes: 6 additions & 1 deletion pkg/gclient/scheduler/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package scheduler
import (
"context"

scheduler "cloud.google.com/go/scheduler/apiv1"
"cloud.google.com/go/scheduler/apiv1"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
schedulerpb "google.golang.org/genproto/googleapis/cloud/scheduler/v1"
Expand Down Expand Up @@ -57,6 +57,11 @@ func (c *schedulerClient) CreateJob(ctx context.Context, req *schedulerpb.Create
return c.client.CreateJob(ctx, req, opts...)
}

// UpdateJob implements scheduler.CloudSchedulerClient.UpdateJobRequest
func (c *schedulerClient) UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) {
return c.client.UpdateJob(ctx, req, opts...)
}

// DeleteJob implements scheduler.CloudSchedulerClient.DeleteJob
func (c *schedulerClient) DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRequest, opts ...gax.CallOption) error {
return c.client.DeleteJob(ctx, req, opts...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/gclient/scheduler/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Client interface {
Close() error
// CreateJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.CreateJob
CreateJob(ctx context.Context, req *schedulerpb.CreateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error)
// UpdateJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.UpdateJob
UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error)
// DeleteJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.DeleteJob
DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRequest, opts ...gax.CallOption) error
// GetJob see https://godoc.org/cloud.google.com/go/scheduler/apiv1#CloudSchedulerClient.GetJob
Expand Down
11 changes: 11 additions & 0 deletions pkg/gclient/scheduler/testing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type TestClientData struct {
CreateClientErr error
CreateJobErr error
DeleteJobErr error
UpdateJobErr error
GetJobErr error
CloseErr error
}
Expand Down Expand Up @@ -83,6 +84,16 @@ func (c *testClient) DeleteJob(ctx context.Context, req *schedulerpb.DeleteJobRe
return c.data.DeleteJobErr
}

// UpdateJob implements client.UpdateJob
func (c *testClient) UpdateJob(ctx context.Context, req *schedulerpb.UpdateJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) {
if c.data.UpdateJobErr != nil {
return nil, c.data.UpdateJobErr
}
return &schedulerpb.Job{
Name: req.Job.Name,
}, nil
}

// GetJob implements client.GetJob
func (c *testClient) GetJob(ctx context.Context, req *schedulerpb.GetJobRequest, opts ...gax.CallOption) (*schedulerpb.Job, error) {
if c.data.GetJobErr != nil {
Expand Down
40 changes: 32 additions & 8 deletions pkg/reconciler/events/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package scheduler

import (
"context"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
"knative.dev/pkg/reconciler"

schedulerpb "google.golang.org/genproto/googleapis/cloud/scheduler/v1"
"google.golang.org/grpc/codes"
gstatus "google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
"knative.dev/pkg/reconciler"

"github.com/google/knative-gcp/pkg/apis/events/v1beta1"
cloudschedulersourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1beta1/cloudschedulersource"
Expand Down Expand Up @@ -112,16 +110,18 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS
}
defer client.Close()

pubsubTargetName := resources.GeneratePubSubTargetTopic(scheduler, topic)

// Check if the job exists.
_, err = client.GetJob(ctx, &schedulerpb.GetJobRequest{Name: jobName})
job, err := client.GetJob(ctx, &schedulerpb.GetJobRequest{Name: jobName})
if err != nil {
if st, ok := gstatus.FromError(err); !ok {
logging.FromContext(ctx).Desugar().Error("Failed from CloudSchedulerSource client while retrieving CloudSchedulerSource job", zap.String("jobName", jobName), zap.Error(err))
return err
} else if st.Code() == codes.NotFound {
// Create the job as it does not exist. For creation, we need a parent, extract it from the jobName.
parent := resources.ExtractParentName(jobName)
// Add our jobName, and schedulerName as customAttributes.
// Add jobName as customAttribute.
customAttributes := map[string]string{
v1beta1.CloudSchedulerSourceJobName: jobName,
}
Expand All @@ -131,7 +131,7 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS
Name: jobName,
Target: &schedulerpb.Job_PubsubTarget{
PubsubTarget: &schedulerpb.PubsubTarget{
TopicName: resources.GeneratePubSubTargetTopic(scheduler, topic),
TopicName: pubsubTargetName,
Data: []byte(scheduler.Spec.Data),
Attributes: customAttributes,
},
Expand All @@ -148,6 +148,30 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS
return err
}
}
// TODO remove after 0.16 cut.
actualTarget := job.GetPubsubTarget()
if actualTarget != nil && actualTarget.TopicName != pubsubTargetName {
// This means that it is using a topic with an old name. We will update the target.
_, err = client.UpdateJob(ctx, &schedulerpb.UpdateJobRequest{
Job: &schedulerpb.Job{
Name: job.Name,
Target: &schedulerpb.Job_PubsubTarget{
PubsubTarget: &schedulerpb.PubsubTarget{
TopicName: pubsubTargetName,
Data: actualTarget.Data,
Attributes: actualTarget.Attributes,
},
},
// Needed to add these two here otherwise I was getting an update error.
Schedule: job.Schedule,
TimeZone: job.TimeZone,
},
})
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to update old CloudSchedulerSource job", zap.String("jobName", jobName), zap.Error(err))
return err
}
}
return nil
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/reconciler/events/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,19 @@ func (r *Reconciler) reconcileNotification(ctx context.Context, storage *v1beta1

// If the notification does exist, then return its ID.
if existing, ok := notifications[storage.Status.NotificationID]; ok {
return existing.ID, nil
// TODO remove after the 0.16 cut.
// If the notification exists, need to check whether it is using the updated topic name.
// If not, then we delete it and create it again.
if existing.TopicID != storage.Status.TopicID {
err := bucket.DeleteNotification(ctx, storage.Status.NotificationID)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old CloudStorageSource notification", zap.Error(err))
return "", err
}
// We let the creation to happen after this enclosing if, thus we do not return here and need this other else.
} else {
return existing.ID, nil
}
}

// If the notification does not exist, then create it.
Expand Down
34 changes: 33 additions & 1 deletion pkg/reconciler/intevents/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
Expand Down Expand Up @@ -106,6 +106,22 @@ func (psb *PubSubBase) reconcileTopic(ctx context.Context, pubsubable duck.PubSu
} else if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err))
return nil, fmt.Errorf("failed to get Topic: %w", err)
// TODO remove this else if after 0.16 cut.
} else if newTopic.Spec.Topic != t.Spec.Topic {
// We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable.
// We have to delete the oldTopic and create a new one here.
logging.FromContext(ctx).Desugar().Info("Deleting old Topic", zap.Any("topic", t))
err = topics.Delete(t.Name, nil)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", t), zap.Error(err))
return nil, fmt.Errorf("failed to delete Topic: %w", err)
}
logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", newTopic))
t, err = topics.Create(newTopic)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", newTopic), zap.Error(err))
return nil, fmt.Errorf("failed to create Topic: %w", err)
}
// Check whether the specs differ and update the Topic if so.
} else if !equality.Semantic.DeepDerivative(newTopic.Spec, t.Spec) {
// Don't modify the informers copy.
Expand Down Expand Up @@ -167,6 +183,22 @@ func (psb *PubSubBase) ReconcilePullSubscription(ctx context.Context, pubsubable
logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err))
return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, pullSubscriptionCreateFailedReason, "Creating PullSubscription failed with: %s", err.Error())
}
// TODO remove this else if after 0.16 cut.
} else if newPS.Spec.Topic != ps.Spec.Topic {
// We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable.
// We have to delete the old PS and create a new one here.
logging.FromContext(ctx).Desugar().Info("Deleting old PullSubscription", zap.Any("ps", ps))
err = pullSubscriptions.Delete(ps.Name, nil)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", ps), zap.Error(err))
return nil, fmt.Errorf("failed to delete Pullsubscription: %w", err)
}
logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("ps", newPS))
ps, err = pullSubscriptions.Create(newPS)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err))
return nil, fmt.Errorf("failed to create PullSubscription: %w", err)
}
// Check whether the specs differ and update the PS if so.
} else if !equality.Semantic.DeepDerivative(newPS.Spec, ps.Spec) {
// Don't modify the informers copy.
Expand Down
1 change: 0 additions & 1 deletion pkg/reconciler/intevents/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, topic *v1beta1.Topic) re

// If enablePublisher is false, then skip creating the publisher.
if enablePublisher := topic.Spec.EnablePublisher; enablePublisher != nil && !*enablePublisher {
// TODO delete previous publishers before the 0.16 cut: https://github.com/google/knative-gcp/issues/1217
return reconciler.NewEvent(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, topic.Namespace, topic.Name)
}

Expand Down
98 changes: 72 additions & 26 deletions pkg/reconciler/messaging/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -213,6 +212,22 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1beta1.Chann
return err
}
r.Recorder.Eventf(channel, corev1.EventTypeNormal, "SubscriberCreated", "Created Subscriber %q", ps.Name)
// TODO remove this else if after 0.16 cut.
} else if ps.Spec.Topic != existingPs.Spec.Topic {
// We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable.
// We have to delete the old PS and create a new one here.
logging.FromContext(ctx).Desugar().Info("Deleting old PullSubscription", zap.Any("ps", existingPs))
err := r.RunClientSet.InternalV1beta1().PullSubscriptions(channel.Namespace).Delete(existingPs.Name, nil)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old PullSubscription", zap.Any("ps", existingPs), zap.Error(err))
return fmt.Errorf("failed to delete Pullsubscription: %w", err)
}
logging.FromContext(ctx).Desugar().Debug("Creating new PullSubscription", zap.Any("ps", ps))
ps, err = r.RunClientSet.InternalV1beta1().PullSubscriptions(channel.Namespace).Create(ps)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", ps), zap.Error(err))
return fmt.Errorf("failed to create PullSubscription: %w", err)
}
} else if !equality.Semantic.DeepEqual(ps.Spec, existingPs.Spec) {
// Don't modify the informers copy.
desired := existingPs.DeepCopy()
Expand Down Expand Up @@ -285,23 +300,11 @@ func (r *Reconciler) syncSubscribersStatus(ctx context.Context, channel *v1beta1
}

func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) {
topic, err := r.getTopic(ctx, channel)
if err != nil && !apierrors.IsNotFound(err) {
logging.FromContext(ctx).Desugar().Error("Unable to get a Topic", zap.Error(err))
return nil, err
}
if topic != nil {
if topic.Status.Address != nil {
channel.Status.SetAddress(topic.Status.Address.URL)
} else {
channel.Status.SetAddress(nil)
}
return topic, nil
}
clusterName := channel.GetAnnotations()[duckv1beta1.ClusterNameAnnotation]
name := resources.GeneratePublisherName(channel)
t := resources.MakeTopic(&resources.TopicArgs{
Owner: channel,
Name: resources.GeneratePublisherName(channel),
Name: name,
Project: channel.Spec.Project,
ServiceAccountName: channel.Spec.ServiceAccountName,
Secret: channel.Spec.Secret,
Expand All @@ -310,14 +313,61 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.Channe
Annotations: resources.GetTopicAnnotations(clusterName),
})

topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err))
r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error())
return nil, err
topic, err := r.getTopic(ctx, channel)
if apierrs.IsNotFound(err) {
topic, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err))
r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error())
return nil, err
}
r.Recorder.Eventf(channel, corev1.EventTypeNormal, "TopicCreated", "Created Topic %q", topic.Name)
return topic, nil
} else if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to get Topic", zap.Error(err))
return nil, fmt.Errorf("failed to get Topic: %w", err)
} else if !metav1.IsControlledBy(topic, channel) {
channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name)
return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name)
// TODO remove this else if after 0.16 cut.
} else if t.Spec.Topic != topic.Spec.Topic {
// We check whether the topic changed. This can only happen when updating to 0.16 as the spec.topic is immutable.
// We have to delete the oldTopic and create a new one here.
logging.FromContext(ctx).Desugar().Info("Deleting old Topic", zap.Any("topic", topic))
err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Delete(topic.Name, nil)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to delete old Topic", zap.Any("topic", topic), zap.Error(err))
return nil, fmt.Errorf("failed to update Topic: %w", err)
}
logging.FromContext(ctx).Desugar().Debug("Creating new Topic", zap.Any("topic", t))
t, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Create(t)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Any("topic", t), zap.Error(err))
return nil, fmt.Errorf("failed to create Topic: %w", err)
}
return t, nil
} else if !equality.Semantic.DeepDerivative(t.Spec, topic.Spec) {
// Don't modify the informers copy.
desired := topic.DeepCopy()
desired.Spec = t.Spec
logging.FromContext(ctx).Desugar().Debug("Updating Topic", zap.Any("topic", desired))
t, err = r.RunClientSet.InternalV1beta1().Topics(channel.Namespace).Update(desired)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to update Topic", zap.Any("topic", topic), zap.Error(err))
return nil, fmt.Errorf("failed to update Topic: %w", err)
}
return t, nil
}
r.Recorder.Eventf(channel, corev1.EventTypeNormal, "TopicCreated", "Created Topic %q", topic.Name)
return topic, err

if topic != nil {
if topic.Status.Address != nil {
channel.Status.SetAddress(topic.Status.Address.URL)
} else {
channel.Status.SetAddress(nil)
}
}

return topic, nil
}

func (r *Reconciler) getTopic(_ context.Context, channel *v1beta1.Channel) (*inteventsv1beta1.Topic, error) {
Expand All @@ -326,10 +376,6 @@ func (r *Reconciler) getTopic(_ context.Context, channel *v1beta1.Channel) (*int
if err != nil {
return nil, err
}
if !metav1.IsControlledBy(topic, channel) {
channel.Status.MarkTopicNotOwned("Topic %q is owned by another resource.", name)
return nil, fmt.Errorf("Channel: %s does not own Topic: %s", channel.Name, name)
}
return topic, nil
}

Expand Down

0 comments on commit 80454f1

Please sign in to comment.