Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Upgrade job (for v0.16.0) that deletes legacy {pullsubscription,topic}.pubsub.cloud.google.com COs #1383

Merged
merged 3 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions cmd/upgrade/v0.16.0/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"

"github.com/google/knative-gcp/pkg/client/clientset/versioned"
upgrader "github.com/google/knative-gcp/pkg/upgrader/v0.16.0"
"k8s.io/client-go/kubernetes"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"
)

func main() {
ctx := signals.NewContext()
cfg := sharedmain.ParseAndGetConfigOrDie()
ctx = context.WithValue(ctx, kubeclient.Key{}, kubernetes.NewForConfigOrDie(cfg))
ctx = context.WithValue(ctx, dynamicclient.Key{}, versioned.NewForConfigOrDie(cfg))
if err := upgrader.Upgrade(ctx); err != nil {
fmt.Printf("Upgrade failed with: %v\n", err)
os.Exit(1)
}
}
17 changes: 17 additions & 0 deletions config/upgrade/v0.16.0/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Upgrade script (optional) to upgrade to v0.16.0 of knative-gcp

Starting from v0.16.0 `pubsub.cloud.google.com` API is no longer supported. This
directory contains a job that deletes the legacy
`{pullsubscription,topic}.pubsub.cloud.google.com` resources in all namespaces.

To run the upgrade script:

```shell
kubectl apply -f https://github.com/google/knative-gcp/releases/download/v0.16.0/upgrade-to-v0.16.0.yaml
```

It will create a job called v0.16.0-upgrade in the `cloud-run-events` namespace.
If you installed to a different namespace, you need to modify the upgrade.yaml
appropriately. Also the job by default runs as `controller` service account, you
can also modify that but the service account will need to have permissions to
list `Namespace`s, delete `{pullsubscription,topic}.pubsub.cloud.google.com`s.
18 changes: 18 additions & 0 deletions config/upgrade/v0.16.0/upgrade.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: batch/v1
kind: Job
metadata:
name: v0.16.0-upgrade
namespace: cloud-run-events
labels:
events.cloud.google.com/release: devel
spec:
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccountName: controller
restartPolicy: Never
containers:
- name: upgrade-brokers
image: ko://github.com/google/knative-gcp/cmd/upgrade/v0.16.0
7 changes: 6 additions & 1 deletion hack/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

source $(dirname "$0")/../vendor/knative.dev/test-infra/scripts/release.sh

readonly UPGRADE_JOB_V_0_16="upgrade-to-v0.16.0.yaml"

# Yaml files to generate, and the source config dir for them.
declare -A COMPONENTS
COMPONENTS=(
Expand All @@ -39,8 +41,11 @@ function build_release() {
LABEL_YAML_CMD=(cat)
fi

# Create v0.16.0 upgrade job yaml
ko resolve ${KO_FLAGS} -f config/upgrade/v0.16.0/ | "${LABEL_YAML_CMD[@]}" > "${UPGRADE_JOB_V_0_16}"

# Build the components
local all_yamls=()
local all_yamls=(${UPGRADE_JOB_V_0_16})
for yaml in "${!COMPONENTS[@]}"; do
local config="${COMPONENTS[${yaml}]}"
echo "Building Cloud Run Events Components - ${config}"
Expand Down
93 changes: 93 additions & 0 deletions pkg/upgrader/v0.16.0/upgrader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package upgrader

import (
"context"
"fmt"

"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
duckapis "knative.dev/pkg/apis"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"
)

// Upgrade deletes legacy resources {pullsubscription,topic}.pubsub.cloud.google.com in all namespaces.
// It's equivalent to:
// kubectl delete pullsubscriptions.pubsub.cloud.google.com --all-namespaces --all
// kubectl delete topics.pubsub.cloud.google.com --all-namespaces --all
func Upgrade(ctx context.Context) error {
nsClient := kubeclient.Get(ctx).CoreV1().Namespaces()
namespaces, err := nsClient.List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list namespaces: %w", err)
}
for _, ns := range namespaces.Items {
if err := processNamespace(ctx, ns.Name); err != nil {
return err
}
}
return nil
}

func processNamespace(ctx context.Context, ns string) error {
logger := logging.FromContext(ctx)
logger.Infof("Processing namespace: %q", ns)

tc, err := topicClient(ctx, ns)
if err != nil {
return err
}

psc, err := pullSubClient(ctx, ns)
if err != nil {
return err
}

logger.Debug("Deleting topics", zap.String("namespace", ns))
if err := tc.DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil {
return fmt.Errorf("failed to delete topics in namespace %q: %w", ns, err)
}

logger.Debug("Deleting pullsubscriptions", zap.String("namespace", ns))
if err := psc.DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil {
return fmt.Errorf("failed to delete pullsubscriptions in namespace %q: %w", ns, err)
}

return nil
}

func topicClient(ctx context.Context, namespace string) (dynamic.ResourceInterface, error) {
return resourceInterface(dynamicclient.Get(ctx), namespace, schema.FromAPIVersionAndKind("pubsub.cloud.google.com/v1alpha1", "Topic"))
}

func pullSubClient(ctx context.Context, namespace string) (dynamic.ResourceInterface, error) {
return resourceInterface(dynamicclient.Get(ctx), namespace, schema.FromAPIVersionAndKind("pubsub.cloud.google.com/v1alpha1", "PullSubscription"))
}

func resourceInterface(dynamicClient dynamic.Interface, namespace string, gvk schema.GroupVersionKind) (dynamic.ResourceInterface, error) {
rc := dynamicClient.Resource(duckapis.KindToResource(gvk))

if rc == nil {
return nil, fmt.Errorf("failed to create dynamic client for gvk: %s", gvk)
}
return rc.Namespace(namespace), nil
}
177 changes: 177 additions & 0 deletions pkg/upgrader/v0.16.0/upgrader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package upgrader

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
kubeclient "knative.dev/pkg/client/injection/kube/client"
fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
)

type obj struct {
apiVersion string
kind string
namespace string
name string
}

type deletion struct {
GroupVersionResource schema.GroupVersionResource
Namespace string
}

func TestUpgrade(t *testing.T) {
ns1, ns2 := "ns1", "ns2"
cases := []struct {
name string
objs []obj
listNamespaceErr bool
deleteErrLegacyResource string
wantDeletes []deletion
wantErr bool
}{{
name: "success",
objs: []obj{
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns1, name: "t1"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns1, name: "t2"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns2, name: "t3"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns1, name: "sub1"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns2, name: "sub2"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns2, name: "sub3"},
},
wantDeletes: []deletion{
{GroupVersionResource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "topics"}, Namespace: ns1},
{GroupVersionResource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}, Namespace: ns1},
{GroupVersionResource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "topics"}, Namespace: ns2},
{GroupVersionResource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}, Namespace: ns2},
},
}, {
name: "list namespace failure",
objs: []obj{
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns1, name: "t1"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns1, name: "t2"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns2, name: "t3"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns1, name: "sub1"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns2, name: "sub2"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns2, name: "sub3"},
},
listNamespaceErr: true,
wantDeletes: make([]deletion, 0),
wantErr: true,
}, {
name: "delete topics failure",
objs: []obj{
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns1, name: "t1"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns1, name: "t2"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns2, name: "t3"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns1, name: "sub1"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns2, name: "sub2"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns2, name: "sub3"},
},
deleteErrLegacyResource: "topics",
wantDeletes: []deletion{
{GroupVersionResource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "topics"}, Namespace: ns1},
},
wantErr: true,
}, {
name: "delete pullsubscriptions failure",
objs: []obj{
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns1, name: "t1"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns1, name: "t2"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "Topic", namespace: ns2, name: "t3"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns1, name: "sub1"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns2, name: "sub2"},
{apiVersion: "pubsub.cloud.google.com/v1alpha1", kind: "PullSubscription", namespace: ns2, name: "sub3"},
},
deleteErrLegacyResource: "pullsubscriptions",
wantDeletes: []deletion{
{GroupVersionResource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "topics"}, Namespace: ns1},
{GroupVersionResource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}, Namespace: ns1},
},
wantErr: true,
}}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
legacyResources := make([]runtime.Object, 0)
for _, o := range tc.objs {
legacyResources = append(legacyResources, newUnstructured(o))
}

ctx, dc := fakedynamicclient.With(context.Background(), runtime.NewScheme(), legacyResources...)
kc := fake.NewSimpleClientset(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns1}},
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns2}},
)
ctx = context.WithValue(ctx, kubeclient.Key{}, kc)

gotDeletes := make([]deletion, 0)
dc.PrependReactor("delete-collection", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
a := action.(clientgotesting.DeleteCollectionAction)
gotDelete := deletion{
GroupVersionResource: a.GetResource(),
Namespace: a.GetNamespace(),
}
gotDeletes = append(gotDeletes, gotDelete)
if a.GetResource().Resource == tc.deleteErrLegacyResource {
return true, nil, fmt.Errorf("inducing failure for %s %s", action.GetVerb(), action.GetResource().Resource)
}
return true, nil, nil
})
if tc.listNamespaceErr {
kc.PrependReactor("list", "namespaces", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, fmt.Errorf("inducing failure for %s %s", action.GetVerb(), action.GetResource().Resource)
})
}

err := Upgrade(ctx)
if tc.wantErr != (err != nil) {
t.Errorf("Upgrade want error got=%v, want=%v", (err != nil), tc.wantErr)
}
if diff := cmp.Diff(tc.wantDeletes, gotDeletes); diff != "" {
t.Errorf("Resources deleted (-want,+got): %v", diff)
}
})
}
}

func newUnstructured(o obj) *unstructured.Unstructured {
u := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": o.apiVersion,
"kind": o.kind,
"metadata": map[string]interface{}{
"namespace": o.namespace,
"name": o.name,
},
"spec": map[string]interface{}{},
"status": map[string]interface{}{},
},
}
return u
}