Skip to content

Commit

Permalink
pmem-csi-operator: delete obsolete objects of a deployment
Browse files Browse the repository at this point in the history
Different operator releases might result in obsolete sub-resources that
were created for a deployment.  Operator shall detect these obsolete
objects if any and delete them in the reconcile loop.

There is no direct API to detect all the objects owned by an object.
So, we solve this by annotating the Deployment CR with the list of
object types & names, and the version of the operator that reconciled.
On operator upgrade/degrade we check if any pre-deployed objects are
still valid.

FIXES intel#595
  • Loading branch information
avalluri committed Sep 1, 2020
1 parent acb46b3 commit 08886bb
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 27 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/pmemcsi/v1alpha1/deployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (c DeploymentChange) String() string {

// EnsureDefaults make sure that the deployment object has all defaults set properly
func (d *Deployment) EnsureDefaults(operatorImage string) error {
if d.ObjectMeta.Annotations == nil {
d.ObjectMeta.Annotations = map[string]string{}
}
if d.Spec.Image == "" {
// If provided use operatorImage
if operatorImage != "" {
Expand Down
2 changes: 2 additions & 0 deletions pkg/pmem-csi-operator/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

// ControllerOptions type defintions for options to be passed to reconcile controller
type ControllerOptions struct {
// version holds Operator/Controller version
Version string
// K8sVersion represents version of the running Kubernetes cluster/API server
K8sVersion version.Version
// Namespace to use for namespace-scoped sub-resources created by the controller
Expand Down
249 changes: 241 additions & 8 deletions pkg/pmem-csi-operator/controller/deployment/controller_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ package deployment
import (
"crypto/rsa"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"runtime"
"strings"

api "github.com/intel/pmem-csi/pkg/apis/pmemcsi/v1alpha1"
grpcserver "github.com/intel/pmem-csi/pkg/grpc-server"
Expand All @@ -25,8 +27,10 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog"
)
Expand All @@ -35,14 +39,31 @@ const (
controllerServicePort = 10000
controllerMetricsPort = 10010
nodeControllerPort = 10001

versionAnnotation = "pmem-csi.intel.com/operator-version"
ownedObjectsAnnotation = "pmem-csi.intel.com/owned-objects"
)

var (
roleGVK = rbacv1.SchemeGroupVersion.WithKind("Role")
clusterRoleGVK = rbacv1.SchemeGroupVersion.WithKind("ClusterRole")
roleBindingGVK = rbacv1.SchemeGroupVersion.WithKind("RoleBinding")
clusterRoleBindingGVK = rbacv1.SchemeGroupVersion.WithKind("ClusterRoleBinding")
serviceAccountGVK = corev1.SchemeGroupVersion.WithKind("ServiceAccount")
secretGVK = corev1.SchemeGroupVersion.WithKind("Secret")
serviceGVK = corev1.SchemeGroupVersion.WithKind("Service")
daemonSetGVK = appsv1.SchemeGroupVersion.WithKind("DaemonSet")
statefulSetGVK = appsv1.SchemeGroupVersion.WithKind("StatefulSet")
csiDriverGVK = storagev1beta1.SchemeGroupVersion.WithKind("CSIDriver")
)

type PmemCSIDriver struct {
*api.Deployment
// operators namespace used for creating sub-resources
namespace string

k8sVersion version.Version
k8sVersion version.Version
needsUpdate bool
}

// Reconcile reconciles the driver deployment
Expand Down Expand Up @@ -198,23 +219,152 @@ func (d *PmemCSIDriver) reconcileDeploymentChanges(r *ReconcileDeployment, chang
}
}

obsoleteList, err := d.getObsoleteObjects(r.operatorVersion)
if err != nil {
klog.Warningf("Failed to get obsolete objects for deployment %q: %v", d.Name, err)
}
for _, o := range obsoleteList {
if err := r.Delete(o); err != nil && !errors.IsNotFound(err) {
klog.Warningf("Failed to delete obsolete object %T for deployment %q: %v", o, d.GetName(), err)
}
}

if err := d.tagOwnedObjects(r.operatorVersion); err != nil {
klog.Warningf("Internal error while updating the annotation %s: %v", d.GetName(), err)
}

return false, nil
}

func (d *PmemCSIDriver) deployObjects(r *ReconcileDeployment) error {
objects, err := d.getDeploymentObjects()
if err != nil {
return err
// tagOwnedObjects updates the deployment's annotation with the information(object name:GVK)
// of the objects it owned as a json string.
func (d *PmemCSIDriver) tagOwnedObjects(operatorVersion string) error {
nameGVKs := []string{}

// Nothing to update if this deployment is created by the same operator version
if d.ObjectMeta.Annotations[versionAnnotation] == operatorVersion {
return nil
}
for _, obj := range objects {
if err := r.Create(obj); err != nil {

for _, obj := range d.getDeploymentOwnedObjects() {
metaObj, err := meta.Accessor(obj)
if err != nil {
return err
}
gvk := obj.GetObjectKind().GroupVersionKind().String()
nameGVKs = append(nameGVKs, metaObj.GetName()+":"+gvk)
}

jsonString, err := json.Marshal(nameGVKs)
if err != nil {
return err
}

d.ObjectMeta.Annotations[versionAnnotation] = operatorVersion
d.ObjectMeta.Annotations[ownedObjectsAnnotation] = string(jsonString)
d.needsUpdate = true

return nil
}

// getDeploymentObjects returns all objects that are part of a driver deployment.
// retrieveTaggedObjects retrives the objects tagged for this deployment by the
// operator previously, on it's first successful reconcile
func (d *PmemCSIDriver) retrieveTaggedObjects() ([]apiruntime.Object, error) {
objects := []apiruntime.Object{}
nameGVKs := []string{}

annotation, ok := d.ObjectMeta.Annotations[ownedObjectsAnnotation]
if !ok {
return objects, nil
}

if err := json.Unmarshal([]byte(annotation), &nameGVKs); err != nil {
return objects, err
}

for i := range nameGVKs {
parts := strings.SplitN(nameGVKs[i], ":", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("failed to parse annototion %q: %s", ownedObjectsAnnotation, nameGVKs[i])
}
gvk, err := parseGroupVersionKind(parts[1])
if err != nil {
return nil, fmt.Errorf("failed to parse annototion %q: %v", ownedObjectsAnnotation, err)
}
objects = append(objects, d.objectForGVK(gvk, parts[0]))
}

return objects, nil
}

// getObsoleteObjects checks if there is any change in operator version compared
// to the one created this deployment. If there is version mismatch then it checks
// if all the tagged objects for this deployment are still valid for the current
// version and list all the obsolete objects that should be deleted
func (d *PmemCSIDriver) getObsoleteObjects(currentOperatorVersion string) ([]apiruntime.Object, error) {
obsoleteList := []apiruntime.Object{}

deploymentVersion, ok := d.ObjectMeta.Annotations[versionAnnotation]
if !ok {
return obsoleteList, nil
}
if deploymentVersion == currentOperatorVersion {
return obsoleteList, nil
}

objsOnCreate, err := d.retrieveTaggedObjects()
if err != nil {
return obsoleteList, err
}
objsNow := d.getDeploymentOwnedObjects()

for _, oThen := range objsOnCreate {
found := false
oThenMeta, err := meta.Accessor(oThen)
if err != nil {
return obsoleteList, err
}
for _, oNow := range objsNow {
if oNow.GetObjectKind().GroupVersionKind() == oThen.GetObjectKind().GroupVersionKind() {
oNowMeta, err := meta.Accessor(oThen)
if err != nil {
return obsoleteList, err
}
if oThenMeta.GetName() == oNowMeta.GetName() {
found = true
break
}
}
}
if !found {
obsoleteList = append(obsoleteList, oThen)
}
}

return obsoleteList, nil
}

// getDeploymentOwnedObjects returns the simple list of object with name and its API GVK
// that are owned(supposed to own) by this deployment.
func (d *PmemCSIDriver) getDeploymentOwnedObjects() []apiruntime.Object {
hyphenedName := d.GetHyphenedName()
return []apiruntime.Object{
d.objectForGVK(secretGVK, hyphenedName+"-registry-secrets"),
d.objectForGVK(secretGVK, hyphenedName+"-node-secrets"),
d.objectForGVK(csiDriverGVK, d.GetName()),
d.objectForGVK(serviceAccountGVK, hyphenedName+"-controller"),
d.objectForGVK(roleGVK, hyphenedName+"-external-provisioner-cfg"),
d.objectForGVK(roleBindingGVK, hyphenedName+"-csi-provisioner-role-cfg"),
d.objectForGVK(clusterRoleGVK, hyphenedName+"-external-provisioner-runner"),
d.objectForGVK(clusterRoleBindingGVK, hyphenedName+"-csi-provisioner-role"),
d.objectForGVK(serviceGVK, hyphenedName+"-controller"),
d.objectForGVK(serviceGVK, hyphenedName+"-metrics"),
d.objectForGVK(statefulSetGVK, hyphenedName+"-controller"),
d.objectForGVK(statefulSetGVK, hyphenedName+"-node"),
}
}

// getDeploymentObjects returns all objects that are to be created for a driver deployment.
func (d *PmemCSIDriver) getDeploymentObjects() ([]apiruntime.Object, error) {
objects, err := d.getSecrets()
if err != nil {
Expand Down Expand Up @@ -1046,6 +1196,69 @@ func (d *PmemCSIDriver) getObjectMeta(name string, isClusterResource bool) metav
return meta
}

func (d *PmemCSIDriver) objectForGVK(gvk schema.GroupVersionKind, name string) apiruntime.Object {
typeMeta := metav1.TypeMeta{
Kind: gvk.Kind,
APIVersion: gvk.GroupVersion().String(),
}
switch gvk {
case roleGVK:
return &rbacv1.Role{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, false),
}
case roleBindingGVK:
return &rbacv1.RoleBinding{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, false),
}
case clusterRoleGVK:
return &rbacv1.ClusterRole{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, true),
}
case clusterRoleBindingGVK:
return &rbacv1.ClusterRoleBinding{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, true),
}
case serviceAccountGVK:
return &corev1.ServiceAccount{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, false),
}
case secretGVK:
return &corev1.Secret{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, false),
}
case serviceGVK:
return &corev1.Service{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, false),
}
case daemonSetGVK:
return &appsv1.DaemonSet{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, false),
}
case statefulSetGVK:
return &appsv1.StatefulSet{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, false),
}
case csiDriverGVK:
return &storagev1beta1.CSIDriver{
TypeMeta: typeMeta,
ObjectMeta: d.getObjectMeta(name, true),
}
default:
klog.Warningf("Ignoring unexpected GVK: %s", gvk.String())
}

return nil
}

func joinMaps(left, right map[string]string) map[string]string {
result := map[string]string{}
for key, value := range left {
Expand All @@ -1056,3 +1269,23 @@ func joinMaps(left, right map[string]string) map[string]string {
}
return result
}

// parseGroupVersionKind parses the gvk string to from a schema.GroupVersionKind object
// valid format is: <AIPGroup>/>Version>,Kind=<kind>
func parseGroupVersionKind(strGVK string) (schema.GroupVersionKind, error) {
gvk := schema.GroupVersionKind{}
parts := strings.SplitN(strGVK, ", Kind=", 2)
if len(parts) != 2 {
return gvk, fmt.Errorf("invalid GroupVersionKind string %q", gvk)
}
gv, err := schema.ParseGroupVersion(parts[0])
if err != nil {
return gvk, err
}

gvk.Group = gv.Group
gvk.Version = gv.Version
gvk.Kind = parts[1]

return gvk, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ var _ reconcile.Reconciler = &ReconcileDeployment{}

// ReconcileDeployment reconciles a Deployment object
type ReconcileDeployment struct {
client client.Client
evBroadcaster record.EventBroadcaster
evRecorder record.EventRecorder
namespace string
k8sVersion version.Version
client client.Client
evBroadcaster record.EventBroadcaster
evRecorder record.EventRecorder
namespace string
k8sVersion version.Version
operatorVersion string
// container image used for deploying the operator
containerImage string
// known deployments
Expand Down Expand Up @@ -113,13 +114,14 @@ func NewReconcileDeployment(client client.Client, opts pmemcontroller.Controller
evRecorder := evBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "pmem-csi-operator"})

return &ReconcileDeployment{
client: client,
evBroadcaster: evBroadcaster,
evRecorder: evRecorder,
k8sVersion: opts.K8sVersion,
namespace: opts.Namespace,
containerImage: opts.DriverImage,
deployments: map[string]*pmemcsiv1alpha1.Deployment{},
client: client,
evBroadcaster: evBroadcaster,
evRecorder: evRecorder,
k8sVersion: opts.K8sVersion,
operatorVersion: opts.Version,
namespace: opts.Namespace,
containerImage: opts.DriverImage,
deployments: map[string]*pmemcsiv1alpha1.Deployment{},
}, nil
}

Expand Down Expand Up @@ -159,7 +161,9 @@ func (r *ReconcileDeployment) Reconcile(request reconcile.Request) (reconcile.Re
return reconcile.Result{Requeue: false}, nil
}

d := &PmemCSIDriver{deployment, r.namespace, r.k8sVersion}
patch := client.MergeFrom(deployment.DeepCopy())

d := &PmemCSIDriver{deployment, r.namespace, r.k8sVersion, false}

// update status
defer func() {
Expand All @@ -169,6 +173,14 @@ func (r *ReconcileDeployment) Reconcile(request reconcile.Request) (reconcile.Re
klog.Warningf("failed to update status %q for deployment %q: %v",
d.Deployment.Status.Phase, d.Name, statusErr)
}
if d.needsUpdate {
if err := r.client.Patch(context.TODO(), d.Deployment, patch); err != nil {
klog.Warningf("Failed update deployment %q: %s", d.GetName(), err)
} else {
d.needsUpdate = false
}
}

}()

requeue, err = d.Reconcile(r)
Expand Down
Loading

0 comments on commit 08886bb

Please sign in to comment.