From 48fed15fcebd55a75da8d9562f1e6227a67fae1a Mon Sep 17 00:00:00 2001 From: Nikhil-Ladha Date: Mon, 8 Jul 2024 17:41:40 +0530 Subject: [PATCH] groupreplication: add controller logic for volume group replication added controller logic for volume group replication Signed-off-by: Nikhil-Ladha --- .../v1alpha1/volumegroupreplication_types.go | 9 +- .../volumegroupreplicationcontent_types.go | 5 +- cmd/manager/main.go | 6 +- controllers/replication.storage/finalizers.go | 97 +++- controllers/replication.storage/pvc.go | 39 +- controllers/replication.storage/pvc_test.go | 8 +- controllers/replication.storage/utils.go | 85 +++ .../volumegroupreplication_controller.go | 541 +++++++++++++++++- .../volumegroupreplication_test.go | 187 ++++++ .../volumegroupreplicationclass.go | 44 ++ .../volumegroupreplicationclass_test.go | 78 +++ ...olumegroupreplicationcontent_controller.go | 158 ++++- .../volumereplication_controller.go | 182 ++---- 13 files changed, 1249 insertions(+), 190 deletions(-) create mode 100644 controllers/replication.storage/utils.go create mode 100644 controllers/replication.storage/volumegroupreplication_test.go create mode 100644 controllers/replication.storage/volumegroupreplicationclass.go create mode 100644 controllers/replication.storage/volumegroupreplicationclass_test.go diff --git a/apis/replication.storage/v1alpha1/volumegroupreplication_types.go b/apis/replication.storage/v1alpha1/volumegroupreplication_types.go index 8ad2bff70..6a2b1d60e 100644 --- a/apis/replication.storage/v1alpha1/volumegroupreplication_types.go +++ b/apis/replication.storage/v1alpha1/volumegroupreplication_types.go @@ -21,6 +21,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + VolumeGroupReplicationNameAnnotation = "replication.storage.openshift.io/volume-group-replication-name" +) + // VolumeGroupReplicationSpec defines the desired state of VolumeGroupReplication type VolumeGroupReplicationSpec struct { // volumeGroupReplicationClassName is the volumeGroupReplicationClass name for this VolumeGroupReplication resource @@ -28,9 +32,10 @@ type VolumeGroupReplicationSpec struct { // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumeGroupReplicationClassName is immutable" VolumeGroupReplicationClassName string `json:"volumeGroupReplicationClassName"` - // volumeReplicationClassName is the volumeReplicationClass name for VolumeReplication object + // volumeReplicationClassName is the volumeReplicationClass name for the VolumeReplication object + // created for this volumeGroupReplication // +kubebuilder:validation:Required - // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumReplicationClassName is immutable" + // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumeReplicationClassName is immutable" VolumeReplicationClassName string `json:"volumeReplicationClassName"` // Name of the VolumeReplication object created for this volumeGroupReplication diff --git a/apis/replication.storage/v1alpha1/volumegroupreplicationcontent_types.go b/apis/replication.storage/v1alpha1/volumegroupreplicationcontent_types.go index fcbb7a95a..d2d063935 100644 --- a/apis/replication.storage/v1alpha1/volumegroupreplicationcontent_types.go +++ b/apis/replication.storage/v1alpha1/volumegroupreplicationcontent_types.go @@ -52,9 +52,8 @@ type VolumeGroupReplicationContentSpec struct { // +optional VolumeGroupReplicationClassName string `json:"volumeGroupReplicationClassName"` - // Source specifies whether the snapshot is (or should be) dynamically provisioned + // Source specifies whether the volume is (or should be) dynamically provisioned // or already exists, and just requires a Kubernetes object representation. - // This field is immutable after creation. // Required. Source VolumeGroupReplicationContentSource `json:"source"` } @@ -68,7 +67,7 @@ type VolumeGroupReplicationContentSource struct { // VolumeGroupReplicationContentStatus defines the status of VolumeGroupReplicationContent type VolumeGroupReplicationContentStatus struct { - // PersistentVolumeRefList is the list of of PV for the group replication + // PersistentVolumeRefList is the list of PV for the group replication // The maximum number of allowed PV in the group is 100. // +optional PersistentVolumeRefList []corev1.LocalObjectReference `json:"persistentVolumeRefList,omitempty"` diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 4016e0c40..4c0a6a97e 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -205,8 +205,10 @@ func main() { os.Exit(1) } if err = (&replicationController.VolumeGroupReplicationContentReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Connpool: connPool, + Timeout: defaultTimeout, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "VolumeGroupReplicationContent") os.Exit(1) diff --git a/controllers/replication.storage/finalizers.go b/controllers/replication.storage/finalizers.go index c1d6d233a..3f7eb0fed 100644 --- a/controllers/replication.storage/finalizers.go +++ b/controllers/replication.storage/finalizers.go @@ -26,11 +26,13 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( volumeReplicationFinalizer = "replication.storage.openshift.io" pvcReplicationFinalizer = "replication.storage.openshift.io/pvc-protection" + vgrReplicationFinalizer = "replication.storage.openshift.io/vgr-protection" ) // addFinalizerToVR adds the VR finalizer on the VolumeReplication instance. @@ -64,31 +66,96 @@ func (r *VolumeReplicationReconciler) removeFinalizerFromVR(logger logr.Logger, return nil } -// addFinalizerToPVC adds the VR finalizer on the PersistentVolumeClaim. -func (r *VolumeReplicationReconciler) addFinalizerToPVC(logger logr.Logger, pvc *corev1.PersistentVolumeClaim) error { - if !slices.Contains(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) { - logger.Info("adding finalizer to PersistentVolumeClaim object", "Finalizer", pvcReplicationFinalizer) - pvc.ObjectMeta.Finalizers = append(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) - if err := r.Client.Update(context.TODO(), pvc); err != nil { +// AddFinalizerToPVC adds the VR finalizer on the PersistentVolumeClaim. +func AddFinalizerToPVC(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim, + replicationFinalizer string) error { + if !slices.Contains(pvc.ObjectMeta.Finalizers, replicationFinalizer) { + logger.Info("adding finalizer to PersistentVolumeClaim object", "Finalizer", replicationFinalizer) + pvc.ObjectMeta.Finalizers = append(pvc.ObjectMeta.Finalizers, replicationFinalizer) + if err := client.Update(context.TODO(), pvc); err != nil { return fmt.Errorf("failed to add finalizer (%s) to PersistentVolumeClaim resource"+ " (%s/%s) %w", - pvcReplicationFinalizer, pvc.Namespace, pvc.Name, err) + replicationFinalizer, pvc.Namespace, pvc.Name, err) } } return nil } -// removeFinalizerFromPVC removes the VR finalizer on PersistentVolumeClaim. -func (r *VolumeReplicationReconciler) removeFinalizerFromPVC(logger logr.Logger, pvc *corev1.PersistentVolumeClaim, -) error { - if slices.Contains(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) { - logger.Info("removing finalizer from PersistentVolumeClaim object", "Finalizer", pvcReplicationFinalizer) - pvc.ObjectMeta.Finalizers = util.RemoveFromSlice(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) - if err := r.Client.Update(context.TODO(), pvc); err != nil { +// RemoveFinalizerFromPVC removes the VR finalizer on PersistentVolumeClaim. +func RemoveFinalizerFromPVC(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim, + replicationFinalizer string) error { + if slices.Contains(pvc.ObjectMeta.Finalizers, replicationFinalizer) { + logger.Info("removing finalizer from PersistentVolumeClaim object", "Finalizer", replicationFinalizer) + pvc.ObjectMeta.Finalizers = util.RemoveFromSlice(pvc.ObjectMeta.Finalizers, replicationFinalizer) + if err := client.Update(context.TODO(), pvc); err != nil { return fmt.Errorf("failed to remove finalizer (%s) from PersistentVolumeClaim resource"+ " (%s/%s), %w", - pvcReplicationFinalizer, pvc.Namespace, pvc.Name, err) + replicationFinalizer, pvc.Namespace, pvc.Name, err) + } + } + + return nil +} + +// AddFinalizerToVGR adds the VGR finalizer on the VolumeGroupReplication resource +func AddFinalizerToVGR(client client.Client, logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication) error { + if !slices.Contains(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) { + logger.Info("adding finalizer to volumeGroupReplication object", "Finalizer", vgrReplicationFinalizer) + vgr.ObjectMeta.Finalizers = append(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) + if err := client.Update(context.TODO(), vgr); err != nil { + return fmt.Errorf("failed to add finalizer (%s) to VolumeGroupReplication resource"+ + " (%s/%s) %w", + vgrReplicationFinalizer, vgr.Namespace, vgr.Name, err) + } + } + + return nil +} + +// RemoveFinalizerFromVGR removes the VR finalizer from the VolumeGroupReplication instance. +func RemoveFinalizerFromVGR(client client.Client, logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication) error { + if slices.Contains(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) { + logger.Info("removing finalizer from volumeGroupReplication object", "Finalizer", vgrReplicationFinalizer) + vgr.ObjectMeta.Finalizers = util.RemoveFromSlice(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) + if err := client.Update(context.TODO(), vgr); err != nil { + return fmt.Errorf("failed to remove finalizer (%s) from VolumeGroupReplication resource"+ + " (%s/%s), %w", + vgrReplicationFinalizer, vgr.Namespace, vgr.Name, err) + } + } + + return nil +} + +// addFinalizerToVGRContent adds the VGR finalizer on the VolumeGroupReplicationContent resource +func (r *VolumeGroupReplicationContentReconciler) addFinalizerToVGRContent(logger logr.Logger, + vgrContent *replicationv1alpha1.VolumeGroupReplicationContent) error { + + if !slices.Contains(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) { + logger.Info("adding finalizer to volumeGroupReplicationContent object", "Finalizer", vgrReplicationFinalizer) + vgrContent.ObjectMeta.Finalizers = append(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) + if err := r.Client.Update(context.TODO(), vgrContent); err != nil { + return fmt.Errorf("failed to add finalizer (%s) to VolumeGroupReplicationContent resource"+ + " (%s/%s) %w", + vgrReplicationFinalizer, vgrContent.Namespace, vgrContent.Name, err) + } + } + + return nil +} + +// removeFinalizerFromVR removes the VR finalizer from the VolumeReplication instance. +func (r *VolumeGroupReplicationContentReconciler) removeFinalizerFromVGRContent(logger logr.Logger, + vgrContent *replicationv1alpha1.VolumeGroupReplicationContent) error { + + if slices.Contains(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) { + logger.Info("removing finalizer from volumeGroupReplicationContent object", "Finalizer", vgrReplicationFinalizer) + vgrContent.ObjectMeta.Finalizers = util.RemoveFromSlice(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) + if err := r.Client.Update(context.TODO(), vgrContent); err != nil { + return fmt.Errorf("failed to remove finalizer (%s) from VolumeGroupReplicationContent resource"+ + " (%s/%s), %w", + vgrReplicationFinalizer, vgrContent.Namespace, vgrContent.Name, err) } } diff --git a/controllers/replication.storage/pvc.go b/controllers/replication.storage/pvc.go index 12145430b..a2f3022ef 100644 --- a/controllers/replication.storage/pvc.go +++ b/controllers/replication.storage/pvc.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" ) @@ -59,22 +60,27 @@ func (r VolumeReplicationReconciler) getPVCDataSource(logger logr.Logger, req ty return pvc, pv, nil } -// annotatePVCWithOwner will add the VolumeReplication details to the PVC annotations. -func (r *VolumeReplicationReconciler) annotatePVCWithOwner(ctx context.Context, logger logr.Logger, reqOwnerName string, pvc *corev1.PersistentVolumeClaim) error { +// AnnotatePVCWithOwner will add the VolumeReplication/VolumeGroupReplication details to the PVC annotations. +func AnnotatePVCWithOwner(client client.Client, logger logr.Logger, reqOwnerName string, + pvc *corev1.PersistentVolumeClaim, pvcAnnotation string) error { if pvc.ObjectMeta.Annotations == nil { pvc.ObjectMeta.Annotations = map[string]string{} } - currentOwnerName := pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] + if pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] != "" && + pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeGroupReplicationNameAnnotation] != "" { + logger.Info("PVC can't be part of both VolumeGroupReplication and VolumeReplication") + return fmt.Errorf("PVC %q can't be owned by both VolumeReplication and VolumeGroupReplication", pvc.Name) + } + + currentOwnerName := pvc.ObjectMeta.Annotations[pvcAnnotation] if currentOwnerName == "" { logger.Info("setting owner on PVC annotation", "Name", pvc.Name, "owner", reqOwnerName) - pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] = reqOwnerName - err := r.Update(ctx, pvc) + pvc.ObjectMeta.Annotations[pvcAnnotation] = reqOwnerName + err := client.Update(context.TODO(), pvc) if err != nil { logger.Error(err, "Failed to update PVC annotation", "Name", pvc.Name) - - return fmt.Errorf("failed to update PVC %q annotation for VolumeReplication: %w", - pvc.Name, err) + return fmt.Errorf("failed to update PVC %q annotation for replication: %w", pvc.Name, err) } return nil @@ -86,22 +92,23 @@ func (r *VolumeReplicationReconciler) annotatePVCWithOwner(ctx context.Context, "current owner", currentOwnerName, "requested owner", reqOwnerName) - return fmt.Errorf("PVC %q not owned by VolumeReplication %q", + return fmt.Errorf("PVC %q not owned by correct VolumeReplication/VolumeGroupReplication %q", pvc.Name, reqOwnerName) } return nil } -// removeOwnerFromPVCAnnotation removes the VolumeReplication owner from the PVC annotations. -func (r *VolumeReplicationReconciler) removeOwnerFromPVCAnnotation(ctx context.Context, logger logr.Logger, pvc *corev1.PersistentVolumeClaim) error { - if _, ok := pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation]; ok { - logger.Info("removing owner annotation from PersistentVolumeClaim object", "Annotation", replicationv1alpha1.VolumeReplicationNameAnnotation) - delete(pvc.ObjectMeta.Annotations, replicationv1alpha1.VolumeReplicationNameAnnotation) - if err := r.Client.Update(ctx, pvc); err != nil { +// RemoveOwnerFromPVCAnnotation removes the VolumeReplication/VolumeGroupReplication owner from the PVC annotations. +func RemoveOwnerFromPVCAnnotation(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim, + pvcAnnotation string) error { + if _, ok := pvc.ObjectMeta.Annotations[pvcAnnotation]; ok { + logger.Info("removing owner annotation from PersistentVolumeClaim object", "Annotation", pvcAnnotation) + delete(pvc.ObjectMeta.Annotations, pvcAnnotation) + if err := client.Update(context.TODO(), pvc); err != nil { return fmt.Errorf("failed to remove annotation %q from PersistentVolumeClaim "+ "%q %w", - replicationv1alpha1.VolumeReplicationNameAnnotation, pvc.Name, err) + pvcAnnotation, pvc.Name, err) } } diff --git a/controllers/replication.storage/pvc_test.go b/controllers/replication.storage/pvc_test.go index 749a6312b..dc71c34a1 100644 --- a/controllers/replication.storage/pvc_test.go +++ b/controllers/replication.storage/pvc_test.go @@ -226,7 +226,7 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) { ctx := context.TODO() reconciler := createFakeVolumeReplicationReconciler(t, testPVC, volumeReplication) - err := reconciler.annotatePVCWithOwner(ctx, log.FromContext(context.TODO()), vrName, testPVC) + err := AnnotatePVCWithOwner(reconciler.Client, log.FromContext(context.TODO()), vrName, testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation) if tc.errorExpected { assert.Error(t, err) } else { @@ -244,11 +244,11 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) { assert.Equal(t, testPVC.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation], vrName) } - err = reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), testPVC) + err = RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation) assert.NoError(t, err) // try calling delete again, it should not fail - err = reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), testPVC) + err = RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation) assert.NoError(t, err) } @@ -262,6 +262,6 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) { } volumeReplication := &replicationv1alpha1.VolumeReplication{} reconciler := createFakeVolumeReplicationReconciler(t, pvc, volumeReplication) - err := reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), pvc) + err := RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), pvc, replicationv1alpha1.VolumeReplicationNameAnnotation) assert.NoError(t, err) } diff --git a/controllers/replication.storage/utils.go b/controllers/replication.storage/utils.go new file mode 100644 index 000000000..fcad59d2f --- /dev/null +++ b/controllers/replication.storage/utils.go @@ -0,0 +1,85 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "time" + + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func SetFailureCondition(instanceState replicationv1alpha1.ReplicationState, + instanceStatus *replicationv1alpha1.VolumeReplicationStatus, instanceGeneration int64) { + + switch instanceState { + case replicationv1alpha1.Primary: + setFailedPromotionCondition(&instanceStatus.Conditions, instanceGeneration) + case replicationv1alpha1.Secondary: + setFailedDemotionCondition(&instanceStatus.Conditions, instanceGeneration) + case replicationv1alpha1.Resync: + setFailedResyncCondition(&instanceStatus.Conditions, instanceGeneration) + } +} + +func GetReplicationState(instanceState replicationv1alpha1.ReplicationState) replicationv1alpha1.State { + switch instanceState { + case replicationv1alpha1.Primary: + return replicationv1alpha1.PrimaryState + case replicationv1alpha1.Secondary: + return replicationv1alpha1.SecondaryState + case replicationv1alpha1.Resync: + return replicationv1alpha1.SecondaryState + } + + return replicationv1alpha1.UnknownState +} + +func GetCurrentReplicationState(instanceStatusState replicationv1alpha1.State) replicationv1alpha1.State { + if instanceStatusState == "" { + return replicationv1alpha1.UnknownState + } + + return instanceStatusState +} + +func WaitForVolumeReplicationResource(client client.Client, logger logr.Logger, resourceName string) error { + unstructuredResource := &unstructured.UnstructuredList{} + unstructuredResource.SetGroupVersionKind(schema.GroupVersionKind{ + Group: replicationv1alpha1.GroupVersion.Group, + Kind: resourceName, + Version: replicationv1alpha1.GroupVersion.Version, + }) + for { + err := client.List(context.TODO(), unstructuredResource) + if err == nil { + return nil + } + // return errors other than NoMatch + if !meta.IsNoMatchError(err) { + logger.Error(err, "got an unexpected error while waiting for resource", "Resource", resourceName) + return err + } + logger.Info("resource does not exist", "Resource", resourceName) + time.Sleep(5 * time.Second) + } +} diff --git a/controllers/replication.storage/volumegroupreplication_controller.go b/controllers/replication.storage/volumegroupreplication_controller.go index 8b96c5d94..a90882e66 100644 --- a/controllers/replication.storage/volumegroupreplication_controller.go +++ b/controllers/replication.storage/volumegroupreplication_controller.go @@ -18,13 +18,36 @@ package controllers import ( "context" + "fmt" + "reflect" + "strings" + "github.com/csi-addons/kubernetes-csi-addons/controllers/replication.storage/replication" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" + "github.com/go-logr/logr" +) + +const ( + volumeGroupReplicationClass = "VolumeGroupReplicationClass" + volumeGroupReplication = "VolumeGroupReplication" + volumeGroupReplicationContent = "VolumeGroupReplicationContent" ) // VolumeGroupReplicationReconciler reconciles a VolumeGroupReplication object @@ -33,21 +56,533 @@ type VolumeGroupReplicationReconciler struct { Scheme *runtime.Scheme } -//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplications,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplications,verbs=get;list;watch;update;patch //+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplications/status,verbs=get;update;patch //+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplications/finalizers,verbs=update +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationclasses,verbs=get;list +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationcontents,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications/status,verbs=get;list +//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims;persistentvolumes,verbs=get;list;watch;update +//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims/finalizers,verbs=update // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *VolumeGroupReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) + logger := log.FromContext(ctx, "Request.Name", req.Name, "Request.Namespace", req.Namespace) + + // Fetch VolumeGroupReplication instance + instance := &replicationv1alpha1.VolumeGroupReplication{} + err := r.Client.Get(ctx, req.NamespacedName, instance) + if err != nil { + if errors.IsNotFound(err) { + logger.Info("volumeGroupReplication resource not found") + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + // Get VolumeGroupReplicationClass instance + vgrClassObj, err := r.getVolumeGroupReplicationClass(logger, instance.Spec.VolumeGroupReplicationClassName) + if err != nil { + logger.Error(err, "failed to get volumeGroupReplicationClass resource", "VGRClassName", instance.Spec.VolumeGroupReplicationClassName) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + err = validatePrefixedParameters(vgrClassObj.Spec.Parameters) + if err != nil { + logger.Error(err, "failed to validate parameters of volumeGroupReplicationClass", "VGRClassName", instance.Spec.VolumeGroupReplicationClassName) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + // Declare all dependent resources + vgrContentObj := &replicationv1alpha1.VolumeGroupReplicationContent{} + vrObj := &replicationv1alpha1.VolumeReplication{} + + // Create/Update dependent resources only if the instance is not marked for deletion + if instance.GetDeletionTimestamp().IsZero() { + + // Add finalizer to VGR instance + if err = AddFinalizerToVGR(r.Client, logger, instance); err != nil { + logger.Error(err, "failed to add VolumeGroupReplication finalizer") + return reconcile.Result{}, err + } + + // Check if PVCs exist based on provided selectors + pvcList := corev1.PersistentVolumeClaimList{} + if instance.Spec.Source.Selector != nil { + labelSelector := "" + pvcList, labelSelector, err = r.getMatchingPVCsFromSource(instance, logger) + if err != nil { + logger.Error(err, "failed to get PVCs using selector") + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + if len(pvcList.Items) == 0 { + logger.Info("No matching PVCs found for the given selectors") + return reconcile.Result{}, nil + } + + // Add the string representation of the labelSelector to the VGR annotation + if instance.ObjectMeta.Annotations == nil { + instance.ObjectMeta.Annotations = make(map[string]string) + } + + if instance.ObjectMeta.Annotations["pvcSelector"] != labelSelector { + instance.ObjectMeta.Annotations["pvcSelector"] = labelSelector + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to add pvc selector annotation to VGR") + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + } else { + logger.Info("No selector provided for PVCs in volumeGroupReplication source") + return reconcile.Result{}, nil + } + + // Update PersistentVolumeClaimsRefList in VGR Status + tmpRefList := []corev1.LocalObjectReference{} + for i, pvc := range pvcList.Items { + if i < 100 { + tmpRefList = append(tmpRefList, corev1.LocalObjectReference{ + Name: pvc.Name, + }) + } else { + logger.Info("More than 100 PVCs match the given selector, using only first 100 PVCs") + break + } + } + + // Annotate each PVC with owner and add finalizer to it + for _, pvc := range pvcList.Items { + err = AnnotatePVCWithOwner(r.Client, logger, instance.Name, &pvc, replicationv1alpha1.VolumeGroupReplicationNameAnnotation) + if err != nil { + logger.Error(err, "Failed to add VGR owner annotation on PVC") + return ctrl.Result{}, err + } + + if err = AddFinalizerToPVC(r.Client, logger, &pvc, vgrReplicationFinalizer); err != nil { + logger.Error(err, "Failed to add VGR finalizer on PersistentVolumeClaim") + return reconcile.Result{}, err + } + } + + if !reflect.DeepEqual(instance.Status.PersistentVolumeClaimsRefList, tmpRefList) { + instance.Status.PersistentVolumeClaimsRefList = tmpRefList + err = r.Client.Status().Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update VolumeGroupReplication resource") + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } - return ctrl.Result{}, nil + // Get the PV handles for all the PVCs which are provisioned using the same provisioner + // as mentioned in the VGRClass resource + pvHandlesList, err := r.getPVHandles(*vgrClassObj, pvcList, logger) + if err != nil { + logger.Error(err, "failed to get volume handles from the pvs") + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + // Create/Fetch VolumeGroupReplicationContent CR + if instance.Spec.VolumeGroupReplicationContentName != "" { + err = r.Client.Get(ctx, types.NamespacedName{Name: instance.Spec.VolumeGroupReplicationContentName}, vgrContentObj) + if err != nil { + logger.Error(err, "failed to get volumeGroupReplicationContent resource", "VGRContentName", instance.Spec.VolumeGroupReplicationContentName) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + if !reflect.DeepEqual(pvHandlesList, vgrContentObj.Spec.Source.VolumeHandles) { + vgrContentObj.Spec.Source.VolumeHandles = pvHandlesList + err = r.Client.Update(ctx, vgrContentObj) + if err != nil { + logger.Error(err, "failed to update volumeGroupReplicationContent resource", "VGRContentName", vgrContentObj.Name) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + } else { + vgrContentObj = &replicationv1alpha1.VolumeGroupReplicationContent{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vgrcontent-%s", instance.UID), + Annotations: map[string]string{ + "replication.storage.openshift.io/volumegroupref": fmt.Sprintf("%s/%s", instance.Name, instance.Namespace), + }, + }, + Spec: replicationv1alpha1.VolumeGroupReplicationContentSpec{ + VolumeGroupReplicationRef: corev1.ObjectReference{ + APIVersion: instance.APIVersion, + Kind: instance.Kind, + Name: instance.Name, + Namespace: instance.Namespace, + UID: instance.UID, + }, + Provisioner: vgrClassObj.Spec.Provisioner, + VolumeGroupReplicationClassName: instance.Spec.VolumeGroupReplicationClassName, + Source: replicationv1alpha1.VolumeGroupReplicationContentSource{ + VolumeHandles: pvHandlesList, + }, + }, + } + + err = r.Client.Create(ctx, vgrContentObj) + if err != nil { + logger.Error(err, "failed to create volumeGroupReplicationContent", "VGRContentName", vgrContentObj.Name) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + + // Update the VGR with VGRContentName + instance.Spec.VolumeGroupReplicationContentName = vgrContentObj.Name + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update volumeGroupReplication instance", "VGRName", instance.Name) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + } + + if vgrContentObj.Spec.VolumeGroupReplicationHandle == "" { + logger.Info("Either volumegroupreplicationcontent is not yet created or it is still grouping the volumes to be replicated") + return reconcile.Result{}, nil + } else { + // Create VolumeReplication CR for the current VGR CR, if doesn't exists + vrName := fmt.Sprintf("vr-%s", instance.UID) + apiGroup := "replication.storage.openshift.io" + vrObj = &replicationv1alpha1.VolumeReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: vrName, + Namespace: instance.Namespace, + Annotations: map[string]string{ + "replication.storage.openshift.io/volumegroupref": fmt.Sprintf("%s/%s", instance.Name, instance.Namespace), + }, + }, + Spec: replicationv1alpha1.VolumeReplicationSpec{ + VolumeReplicationClass: instance.Spec.VolumeReplicationClassName, + ReplicationState: instance.Spec.ReplicationState, + DataSource: corev1.TypedLocalObjectReference{ + APIGroup: &apiGroup, + Kind: instance.Kind, + Name: instance.Name, + }, + AutoResync: instance.Spec.AutoResync, + }, + } + _ = controllerutil.SetOwnerReference(instance, vrObj, r.Scheme) + foundVRObj := &replicationv1alpha1.VolumeReplication{} + err = r.Client.Get(ctx, types.NamespacedName{Name: vrName, Namespace: instance.Namespace}, foundVRObj) + if err != nil { + if errors.IsNotFound(err) { + logger.Info("volumeReplication CR not found for volumeGroupReplication. Creating new volumeReplication resource", "VRName", vrName) + err = r.Client.Create(ctx, vrObj) + if err != nil { + logger.Error(err, "failed to create volumeReplication CR", "VRName", vrName) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } else { + logger.Error(err, "failed to get volumeReplication resource", "VRName", vrName) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } else { + if !reflect.DeepEqual(foundVRObj, vrObj) { + foundVRObj = vrObj + err = r.Client.Update(ctx, foundVRObj) + if err != nil { + logger.Error(err, "failed to update volumeReplication resource", "VRName", vrName) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + } + + // Update the VGR with VolumeReplication resource name, if not present + if instance.Spec.VolumeReplicationName == "" { + instance.Spec.VolumeReplicationName = vrObj.Name + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update volumeGroupReplication instance", "VGRName", instance.Name) + err := r.setGroupReplicationFailure(instance, logger, err) + return reconcile.Result{}, err + } + } + } + } else { + if instance.Status.PersistentVolumeClaimsRefList != nil { + for _, pvcRef := range instance.Status.PersistentVolumeClaimsRefList { + pvc := &corev1.PersistentVolumeClaim{} + err = r.Client.Get(ctx, types.NamespacedName{Name: pvcRef.Name}, pvc) + if err != nil { + logger.Error(err, "failed to fetch pvc from VGR status") + return reconcile.Result{}, err + } + + if err = RemoveOwnerFromPVCAnnotation(r.Client, logger, pvc, replicationv1alpha1.VolumeGroupReplicationNameAnnotation); err != nil { + logger.Error(err, "Failed to remove VolumeReplication annotation from PersistentVolumeClaim") + + return reconcile.Result{}, err + } + + if err = RemoveFinalizerFromPVC(r.Client, logger, pvc, vgrReplicationFinalizer); err != nil { + logger.Error(err, "Failed to remove VGR finalizer from PersistentVolumeClaim") + return reconcile.Result{}, err + } + } + } + if vrObj.Name != "" { + err = r.Client.Delete(ctx, vrObj) + if err != nil { + logger.Error(err, "failed to delete dependent volumeReplication resource") + return reconcile.Result{}, err + } + } + + if vgrContentObj.Name != "" { + err = r.Client.Delete(ctx, vgrContentObj) + if err != nil { + logger.Error(err, "failed to delete dependent volumeGroupReplicationContent resource") + return reconcile.Result{}, err + } + } + + if err = RemoveFinalizerFromVGR(r.Client, logger, instance); err != nil { + logger.Error(err, "failed to remove VolumeGroupReplication finalizer") + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil + } + + // Update VGR status based on VR Status + instance.Status.VolumeReplicationStatus = vrObj.Status + err = r.Client.Status().Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update volumeGroupReplication instance's status", "VGRName", instance.Name) + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *VolumeGroupReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { + err := r.waitForGroupCrds() + if err != nil { + return err + } + + // Only reconcile for status update events + skipUpdates := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + } + + enqueueVGRRequest := handler.EnqueueRequestsFromMapFunc( + func(context context.Context, obj client.Object) []reconcile.Request { + // Get the VolumeGroupReplication name,namespace + var vgrName, vgrNamespace string + objAnnotations := obj.GetAnnotations() + for k, v := range objAnnotations { + if k == "replication.storage.openshift.io/volumegroupref" { + vgrName = strings.Split(v, "/")[0] + vgrNamespace = strings.Split(v, "/")[1] + break + } + } + + // Skip reconcile if the triggering resource is not a sub-resource of VGR + if vgrName == "" || vgrNamespace == "" { + return []reconcile.Request{} + } + + // Check if the resource is present in the cluster + vgrObj := &replicationv1alpha1.VolumeGroupReplication{} + logger := log.FromContext(context) + err := r.Client.Get(context, types.NamespacedName{Name: vgrName, Namespace: vgrNamespace}, vgrObj) + if err != nil { + logger.Error(err, "Unable to get VolumeGroupReplication resource") + return []reconcile.Request{} + } + + // Return name and namespace of the VoleumGroupReplication resource + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: vgrObj.Namespace, + Name: vgrObj.Name, + }, + }, + } + }, + ) + + enqueueVGRForPVCRequest := handler.EnqueueRequestsFromMapFunc( + func(context context.Context, obj client.Object) []reconcile.Request { + // Check if the PVC has any labels defined + objLabels := obj.GetLabels() + if len(objLabels) == 0 { + return []reconcile.Request{} + } + + // Check if the resource is present in the cluster + vgrObjsList := &replicationv1alpha1.VolumeGroupReplicationList{} + logger := log.FromContext(context) + err := r.Client.List(context, vgrObjsList) + if err != nil { + logger.Error(err, "failed to list VolumeGroupReplication instances") + return []reconcile.Request{} + } + + // Check if the pvc labels match any VGRs based on selectors present in it's annotation + for _, vgr := range vgrObjsList.Items { + if vgr.Annotations != nil && vgr.Annotations["pvcSelector"] != "" { + labelSelector, err := labels.Parse(vgr.Annotations["pvcSelector"]) + if err != nil { + logger.Error(err, "failed to parse selector from VolumeGroupReplication's annotation") + return []reconcile.Request{} + } + objLabelsSet := labels.Set(objLabels) + if labelSelector.Matches(objLabelsSet) { + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: vgr.Namespace, + Name: vgr.Name, + }, + }, + } + } + } + } + + return []reconcile.Request{} + }, + ) + return ctrl.NewControllerManagedBy(mgr). For(&replicationv1alpha1.VolumeGroupReplication{}). + Owns(&replicationv1alpha1.VolumeGroupReplicationContent{}, builder.WithPredicates(skipUpdates)). + Owns(&replicationv1alpha1.VolumeReplication{}, builder.WithPredicates(skipUpdates)). + Watches(&replicationv1alpha1.VolumeGroupReplicationContent{}, enqueueVGRRequest). + Watches(&replicationv1alpha1.VolumeReplication{}, enqueueVGRRequest). + Watches(&corev1.PersistentVolumeClaim{}, enqueueVGRForPVCRequest). Complete(r) } + +// waitForGroupCrds waits for dependent CRDs to tbe available in the cluster +func (r *VolumeGroupReplicationReconciler) waitForGroupCrds() error { + logger := log.FromContext(context.TODO(), "Name", "checkingGroupDependencies") + + err := WaitForVolumeReplicationResource(r.Client, logger, volumeGroupReplicationClass) + if err != nil { + logger.Error(err, "failed to wait for VolumeGroupReplicationClass CRD") + return err + } + + err = WaitForVolumeReplicationResource(r.Client, logger, volumeGroupReplication) + if err != nil { + logger.Error(err, "failed to wait for VolumeGroupReplication CRD") + return err + } + + err = WaitForVolumeReplicationResource(r.Client, logger, volumeGroupReplicationContent) + if err != nil { + logger.Error(err, "failed to wait for VolumeGroupReplicationContent CRD") + return err + } + + return nil +} + +// setGroupReplicationFailure sets the failure replication status on the VolumeGroupReplication resource +func (r *VolumeGroupReplicationReconciler) setGroupReplicationFailure( + instance *replicationv1alpha1.VolumeGroupReplication, + logger logr.Logger, err error) error { + + instance.Status.State = GetCurrentReplicationState(instance.Status.State) + instance.Status.Message = replication.GetMessageFromError(err) + instance.Status.ObservedGeneration = instance.Generation + if err := r.Client.Status().Update(context.TODO(), instance); err != nil { + logger.Error(err, "failed to update volumeGroupReplication status", "VGRName", instance.Name) + return err + } + + return nil +} + +// getMatchingPVCsFromSource fecthes the PVCs based on the selectors defined in the VolumeGroupReplication resource +func (r *VolumeGroupReplicationReconciler) getMatchingPVCsFromSource(instance *replicationv1alpha1.VolumeGroupReplication, + logger logr.Logger) (corev1.PersistentVolumeClaimList, string, error) { + + pvcList := &corev1.PersistentVolumeClaimList{} + newSelector := labels.NewSelector() + + if instance.Spec.Source.Selector.MatchLabels != nil { + for key, value := range instance.Spec.Source.Selector.MatchLabels { + req, err := labels.NewRequirement(key, selection.Equals, []string{value}) + if err != nil { + logger.Error(err, "failed to add label selector requirement") + return *pvcList, "", err + } + newSelector = newSelector.Add(*req) + } + } + + if instance.Spec.Source.Selector.MatchExpressions != nil { + for _, labelExp := range instance.Spec.Source.Selector.MatchExpressions { + req, err := labels.NewRequirement(labelExp.Key, selection.Operator(labelExp.Operator), labelExp.Values) + if err != nil { + logger.Error(err, "failed to add label selector requirement") + return *pvcList, "", err + } + newSelector = newSelector.Add(*req) + } + } + opts := []client.ListOption{ + client.MatchingLabelsSelector{Selector: newSelector}, + } + err := r.Client.List(context.TODO(), pvcList, opts...) + if err != nil { + logger.Error(err, "failed to list pvcs with the given selectors") + } + return *pvcList, newSelector.String(), err +} + +// getPVHandles fetches the PV handles for the given PVC list, if the provisioner is same as that of the +// VolumeGroupReplicationClass +func (r *VolumeGroupReplicationReconciler) getPVHandles(vgrClass replicationv1alpha1.VolumeGroupReplicationClass, + pvcList corev1.PersistentVolumeClaimList, logger logr.Logger) ([]string, error) { + + pvHandlesList := []string{} + for _, pvc := range pvcList.Items { + pvName := pvc.Spec.VolumeName + pv := &corev1.PersistentVolume{} + err := r.Client.Get(context.TODO(), types.NamespacedName{Name: pvName}, pv) + if err != nil { + logger.Error(err, "failed to get pv for corresponding pvc", "PVC Name", pvc.Name) + return nil, err + } + if pv.Spec.CSI.Driver == vgrClass.Spec.Provisioner { + pvHandlesList = append(pvHandlesList, pv.Spec.CSI.VolumeHandle) + } + } + + return pvHandlesList, nil +} diff --git a/controllers/replication.storage/volumegroupreplication_test.go b/controllers/replication.storage/volumegroupreplication_test.go new file mode 100644 index 000000000..71f31d070 --- /dev/null +++ b/controllers/replication.storage/volumegroupreplication_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2022 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" +) + +const ( + mockPV = "test-vgr-pv" + mockPVC = "test-vgr-pvc" +) + +var mockVolumeGroupReplicationObj = &replicationv1alpha1.VolumeGroupReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "volume-group-replication", + Namespace: mockNamespace, + UID: "testname", + }, + Spec: replicationv1alpha1.VolumeGroupReplicationSpec{ + VolumeGroupReplicationClassName: "volume-group-replication-class", + VolumeReplicationClassName: "volume-replication-class", + Source: replicationv1alpha1.VolumeGroupReplicationSource{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test": "vgr_test", + }, + }, + }, + }, +} + +var mockVGRPersistentVolume = &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: mockPV, + }, + Spec: corev1.PersistentVolumeSpec{ + PersistentVolumeSource: corev1.PersistentVolumeSource{ + CSI: &corev1.CSIPersistentVolumeSource{ + Driver: "test-driver", + VolumeHandle: mockVolumeHandle, + }, + }, + }, +} + +var mockVGRPersistentVolumeClaim = &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: mockPVC, + Namespace: mockNamespace, + Labels: map[string]string{ + "test": "vgr_test", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + VolumeName: mockPV, + }, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: corev1.ClaimBound, + }, +} + +func createFakeVolumeGroupReplicationReconciler(t *testing.T, obj ...runtime.Object) VolumeGroupReplicationReconciler { + t.Helper() + scheme := createFakeScheme(t) + vgrInit := &replicationv1alpha1.VolumeGroupReplication{} + vgrContentInit := &replicationv1alpha1.VolumeGroupReplicationContent{} + client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(obj...).WithStatusSubresource(vgrInit, vgrContentInit).Build() + + return VolumeGroupReplicationReconciler{ + Client: client, + Scheme: scheme, + } +} + +func TestVolumeGroupReplication(t *testing.T) { + t.Parallel() + testcases := []struct { + name string + pv *corev1.PersistentVolume + pvc *corev1.PersistentVolumeClaim + expectedPVCList []string + pvcFound bool + }{ + { + name: "case 1: matching pvc available", + pv: mockVGRPersistentVolume, + pvc: mockVGRPersistentVolumeClaim, + expectedPVCList: []string{mockPVC}, + pvcFound: true, + }, + { + name: "case 2: matching pvc not found", + pv: mockVGRPersistentVolume, + pvc: &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: mockPVC, + Namespace: mockNamespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + VolumeName: mockPV, + }, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: corev1.ClaimBound, + }, + }, + expectedPVCList: []string{}, + pvcFound: false, + }, + } + for _, tc := range testcases { + volumeGroupReplication := &replicationv1alpha1.VolumeGroupReplication{} + mockVolumeGroupReplicationObj.DeepCopyInto(volumeGroupReplication) + + volumeGroupReplicationClass := &replicationv1alpha1.VolumeGroupReplicationClass{} + mockVolumeGroupReplicationClassObj.DeepCopyInto(volumeGroupReplicationClass) + + volumeReplicationClass := &replicationv1alpha1.VolumeReplicationClass{} + mockVolumeReplicationClassObj.DeepCopyInto(volumeReplicationClass) + + testPV := &corev1.PersistentVolume{} + tc.pv.DeepCopyInto(testPV) + + testPVC := &corev1.PersistentVolumeClaim{} + tc.pvc.DeepCopyInto(testPVC) + + r := createFakeVolumeGroupReplicationReconciler(t, testPV, testPVC, volumeReplicationClass, volumeGroupReplicationClass, volumeGroupReplication) + nsKey := types.NamespacedName{ + Namespace: volumeGroupReplication.Namespace, + Name: volumeGroupReplication.Name, + } + req := reconcile.Request{ + NamespacedName: nsKey, + } + res, err := r.Reconcile(context.TODO(), req) + assert.Equal(t, reconcile.Result{}, res) + assert.NoError(t, err) + + pvc := &corev1.PersistentVolumeClaim{} + err = r.Client.Get(context.TODO(), types.NamespacedName{Name: testPVC.Name, Namespace: testPVC.Namespace}, pvc) + assert.NoError(t, err) + + vgr := &replicationv1alpha1.VolumeGroupReplication{} + err = r.Client.Get(context.TODO(), nsKey, vgr) + assert.NoError(t, err) + + if tc.pvcFound { + vgrPVCRefList := vgr.Status.PersistentVolumeClaimsRefList + assert.Equal(t, 1, len(vgrPVCRefList)) + for _, pvc := range vgrPVCRefList { + assert.Equal(t, pvc.Name, mockVGRPersistentVolumeClaim.Name) + } + // Check PVC annotation + assert.Equal(t, mockVolumeGroupReplicationObj.Name, pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeGroupReplicationNameAnnotation]) + // Check VGRContent Created + assert.NotEmpty(t, vgr.Spec.VolumeGroupReplicationContentName) + } else { + assert.Empty(t, volumeGroupReplication.Status.PersistentVolumeClaimsRefList) + assert.Empty(t, vgr.Spec.VolumeGroupReplicationContentName) + } + } +} diff --git a/controllers/replication.storage/volumegroupreplicationclass.go b/controllers/replication.storage/volumegroupreplicationclass.go new file mode 100644 index 000000000..b6ce569d4 --- /dev/null +++ b/controllers/replication.storage/volumegroupreplicationclass.go @@ -0,0 +1,44 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" +) + +// getVolumeGroupReplicationClass fetches the volumegroupreplicationclass object in the given namespace and return the same. +func (r VolumeGroupReplicationReconciler) getVolumeGroupReplicationClass(logger logr.Logger, vgrClassName string) (*replicationv1alpha1.VolumeGroupReplicationClass, error) { + vgrClassObj := &replicationv1alpha1.VolumeGroupReplicationClass{} + err := r.Client.Get(context.TODO(), types.NamespacedName{Name: vgrClassName}, vgrClassObj) + if err != nil { + if errors.IsNotFound(err) { + logger.Error(err, "VolumeGroupReplicationClass not found", "VolumeGroupReplicationClass", vgrClassName) + } else { + logger.Error(err, "Got an unexpected error while fetching VolumeReplicationClass", "VolumeReplicationClass", vgrClassName) + } + + return nil, err + } + + return vgrClassObj, nil +} diff --git a/controllers/replication.storage/volumegroupreplicationclass_test.go b/controllers/replication.storage/volumegroupreplicationclass_test.go new file mode 100644 index 000000000..aff950b40 --- /dev/null +++ b/controllers/replication.storage/volumegroupreplicationclass_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "testing" + + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +var mockVolumeGroupReplicationClassObj = &replicationv1alpha1.VolumeGroupReplicationClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "volume-group-replication-class", + }, + Spec: replicationv1alpha1.VolumeGroupReplicationClassSpec{ + Provisioner: "test-driver", + }, +} + +func TestGetVolumeGroupReplicationClass(t *testing.T) { + t.Parallel() + testcases := []struct { + createVgrc bool + errorExpected bool + isErrorNotFound bool + }{ + {createVgrc: true, errorExpected: false, isErrorNotFound: false}, + {createVgrc: false, errorExpected: true, isErrorNotFound: true}, + } + + for _, tc := range testcases { + var objects []runtime.Object + + volumeGroupReplication := &replicationv1alpha1.VolumeGroupReplication{} + mockVolumeGroupReplicationObj.DeepCopyInto(volumeGroupReplication) + objects = append(objects, volumeGroupReplication) + + if tc.createVgrc { + volumeGroupReplicationClass := &replicationv1alpha1.VolumeGroupReplicationClass{} + mockVolumeGroupReplicationClassObj.DeepCopyInto(volumeGroupReplicationClass) + objects = append(objects, volumeGroupReplicationClass) + } + + reconciler := createFakeVolumeGroupReplicationReconciler(t, objects...) + vgrClassObj, err := reconciler.getVolumeGroupReplicationClass(log.FromContext(context.TODO()), mockVolumeGroupReplicationClassObj.Name) + + if tc.errorExpected { + assert.Error(t, err) + if tc.isErrorNotFound { + assert.True(t, errors.IsNotFound(err)) + } + } else { + assert.NoError(t, err) + assert.NotEqual(t, nil, vgrClassObj) + } + } +} diff --git a/controllers/replication.storage/volumegroupreplicationcontent_controller.go b/controllers/replication.storage/volumegroupreplicationcontent_controller.go index dc91c2c40..24f393afc 100644 --- a/controllers/replication.storage/volumegroupreplicationcontent_controller.go +++ b/controllers/replication.storage/volumegroupreplicationcontent_controller.go @@ -18,19 +18,33 @@ package controllers import ( "context" + "fmt" + "slices" + "time" + replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" + grpcClient "github.com/csi-addons/kubernetes-csi-addons/internal/client" + conn "github.com/csi-addons/kubernetes-csi-addons/internal/connection" + + "github.com/csi-addons/spec/lib/go/identity" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - - replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // VolumeGroupReplicationContentReconciler reconciles a VolumeGroupReplicationContent object type VolumeGroupReplicationContentReconciler struct { client.Client Scheme *runtime.Scheme + // ConnectionPool consists of map of Connection objects + Connpool *conn.ConnectionPool + // Timeout for the Reconcile operation. + Timeout time.Duration } //+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumegroupreplicationcontents,verbs=get;list;watch;create;update;patch;delete @@ -40,14 +54,150 @@ type VolumeGroupReplicationContentReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *VolumeGroupReplicationContentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) + logger := log.FromContext(ctx, "Request.Name", req.Name, "Request.Namespace", req.Namespace) + + // Fetch VolumeGroupReplicationContent instance + instance := &replicationv1alpha1.VolumeGroupReplicationContent{} + err := r.Client.Get(ctx, req.NamespacedName, instance) + if err != nil { + if errors.IsNotFound(err) { + logger.Info("volumeGroupReplicationContent resource not found") + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + volumeGroupClient, err := r.getVolumeGroupClient(ctx, instance.Spec.Provisioner) + if err != nil { + logger.Error(err, "Failed to get VolumeGroupClient") + + return reconcile.Result{}, err + } + + // Check if object is being deleted + if instance.GetDeletionTimestamp().IsZero() { + if err = r.addFinalizerToVGRContent(logger, instance); err != nil { + logger.Error(err, "failed to add VolumeGroupReplicationContent finalizer") + return reconcile.Result{}, err + } + } else { + // Check if the owner VGR is marked for deletion, only then remove the finalizer from VGRContent resource + vgrObj := &replicationv1alpha1.VolumeGroupReplication{} + namespacedObj := types.NamespacedName{Namespace: instance.Spec.VolumeGroupReplicationRef.Namespace, + Name: instance.Spec.VolumeGroupReplicationRef.Name} + err = r.Client.Get(ctx, namespacedObj, vgrObj) + if err != nil { + logger.Error(err, "failed to get owner VolumeGroupReplication") + return reconcile.Result{}, err + } + + if vgrObj.GetDeletionTimestamp().IsZero() { + logger.Info("cannot delete VolumeGroupReplicationContent resource, until owner VolumeGroupReplication instance is deleted") + return reconcile.Result{}, nil + } else { + // Delete the volume group, if created + if instance.Spec.VolumeGroupReplicationHandle != "" { + _, err := volumeGroupClient.DeleteVolumeGroup(instance.Spec.VolumeGroupReplicationHandle) + if err != nil { + logger.Error(err, "failed to delete volume group") + return reconcile.Result{}, err + } + } + + if err = r.removeFinalizerFromVGRContent(logger, instance); err != nil { + logger.Error(err, "failed to remove finalizer from VolumeGroupReplicationContent resource") + return reconcile.Result{}, err + } + } + } + + // Create/Update volume group + if instance.Spec.VolumeGroupReplicationHandle == "" { + groupName := fmt.Sprintf("volumegroup-%s", instance.UID) + resp, err := volumeGroupClient.CreateVolumeGroup(groupName, instance.Spec.Source.VolumeHandles) + if err != nil { + logger.Error(err, "failed to group volumes") + return reconcile.Result{}, err + } - return ctrl.Result{}, nil + // Update the group handle in the VolumeGroupReplicationContent CR + instance.Spec.VolumeGroupReplicationHandle = resp.GetVolumeGroupId() + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update group id in VGRContent") + return reconcile.Result{}, err + } + } else { + groupID := instance.Spec.VolumeGroupReplicationHandle + resp, err := volumeGroupClient.ModifyVolumeGroupMembership(groupID, instance.Spec.Source.VolumeHandles) + if err != nil { + logger.Error(err, "failed to modify volume group") + return reconcile.Result{}, err + } + + // Update the group handle in the VolumeGroupReplicationContent CR + instance.Spec.VolumeGroupReplicationHandle = resp.GetVolumeGroupId() + err = r.Client.Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update group id in VGRContent") + return reconcile.Result{}, err + } + } + + // Update VGRContent resource status + pvList := &corev1.PersistentVolumeList{} + err = r.Client.List(ctx, pvList) + if err != nil { + logger.Error(err, "failed to list PVs") + return reconcile.Result{}, err + } + + pvRefList := []corev1.LocalObjectReference{} + for _, pv := range pvList.Items { + if slices.ContainsFunc(instance.Spec.Source.VolumeHandles, func(handle string) bool { + return pv.Spec.CSI.VolumeHandle == handle + }) { + pvRefList = append(pvRefList, corev1.LocalObjectReference{ + Name: pv.Name, + }) + } + } + instance.Status.PersistentVolumeRefList = pvRefList + err = r.Client.Status().Update(ctx, instance) + if err != nil { + logger.Error(err, "failed to update VGRContent status") + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *VolumeGroupReplicationContentReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). For(&replicationv1alpha1.VolumeGroupReplicationContent{}). Complete(r) } + +func (r *VolumeGroupReplicationContentReconciler) getVolumeGroupClient(ctx context.Context, driverName string) (grpcClient.VolumeGroup, error) { + conn, err := r.Connpool.GetLeaderByDriver(ctx, r.Client, driverName) + if err != nil { + return nil, fmt.Errorf("no leader for the ControllerService of driver %q", driverName) + } + + for _, cap := range conn.Capabilities { + // validate if VOLUME_GROUP capability is supported by the driver. + if cap.GetVolumeGroup() == nil { + continue + } + + // validate of VOLUME_GROUP capability is enabled by the storage driver. + if cap.GetVolumeGroup().GetType() == identity.Capability_VolumeGroup_VOLUME_GROUP { + return grpcClient.NewVolumeGroupClient(conn.Client, r.Timeout), nil + } + } + + return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support VolumeGroup", conn.Name, driverName) + +} diff --git a/controllers/replication.storage/volumereplication_controller.go b/controllers/replication.storage/volumereplication_controller.go index e61925e52..f9a35b4a8 100644 --- a/controllers/replication.storage/volumereplication_controller.go +++ b/controllers/replication.storage/volumereplication_controller.go @@ -34,11 +34,8 @@ import ( "google.golang.org/grpc/codes" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -72,10 +69,10 @@ type VolumeReplicationReconciler struct { Replication grpcClient.VolumeReplication } -// +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications,verbs=get;list;watch;update -// +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications/status,verbs=update -// +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications/finalizers,verbs=update -// +kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplicationclasses,verbs=get;list;watch +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications,verbs=get;list;watch;update +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications/status,verbs=update +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplications/finalizers,verbs=update +//+kubebuilder:rbac:groups=replication.storage.openshift.io,resources=volumereplicationclasses,verbs=get;list;watch //+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims/finalizers,verbs=update //+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch @@ -106,25 +103,16 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re // Get VolumeReplicationClass vrcObj, err := r.getVolumeReplicationClass(logger, instance.Spec.VolumeReplicationClass) if err != nil { - setFailureCondition(instance) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), err.Error()) - if uErr != nil { - logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) - } - - return ctrl.Result{}, err + logger.Error(err, "failed to get volumeReplicationClass", "VRClassName", instance.Spec.VolumeReplicationClass) + err := r.setReplicationFailure(instance, logger, err, true) + return reconcile.Result{}, err } err = validatePrefixedParameters(vrcObj.Spec.Parameters) if err != nil { logger.Error(err, "failed to validate parameters of volumeReplicationClass", "VRCName", instance.Spec.VolumeReplicationClass) - setFailureCondition(instance) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), err.Error()) - if uErr != nil { - logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) - } - - return ctrl.Result{}, err + err := r.setReplicationFailure(instance, logger, err, true) + return reconcile.Result{}, err } // remove the prefix keys in volume replication class parameters parameters := filterPrefixedParameters(replicationParameterPrefix, vrcObj.Spec.Parameters) @@ -148,26 +136,16 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re pvc, pv, pvErr = r.getPVCDataSource(logger, nameSpacedName) if pvErr != nil { logger.Error(pvErr, "failed to get PVC", "PVCName", instance.Spec.DataSource.Name) - setFailureCondition(instance) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), pvErr.Error()) - if uErr != nil { - logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) - } - - return ctrl.Result{}, pvErr + err := r.setReplicationFailure(instance, logger, err, true) + return reconcile.Result{}, err } volumeHandle = pv.Spec.CSI.VolumeHandle default: err = fmt.Errorf("unsupported datasource kind") logger.Error(err, "given kind not supported", "Kind", instance.Spec.DataSource.Kind) - setFailureCondition(instance) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), err.Error()) - if uErr != nil { - logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) - } - - return ctrl.Result{}, nil + err := r.setReplicationFailure(instance, logger, err, true) + return reconcile.Result{}, err } logger.Info("volume handle", "VolumeHandleName", volumeHandle) @@ -204,13 +182,13 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re return reconcile.Result{}, err } - err = r.annotatePVCWithOwner(ctx, logger, req.Name, pvc) + err = AnnotatePVCWithOwner(r.Client, logger, req.Name, pvc, replicationv1alpha1.VolumeGroupReplicationNameAnnotation) if err != nil { logger.Error(err, "Failed to annotate PVC owner") return ctrl.Result{}, err } - if err = r.addFinalizerToPVC(logger, pvc); err != nil { + if err = AddFinalizerToPVC(r.Client, logger, pvc, pvcReplicationFinalizer); err != nil { logger.Error(err, "Failed to add PersistentVolumeClaim finalizer") return reconcile.Result{}, err @@ -224,13 +202,13 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{}, err } - if err = r.removeOwnerFromPVCAnnotation(ctx, logger, pvc); err != nil { + if err = RemoveOwnerFromPVCAnnotation(r.Client, logger, pvc, replicationv1alpha1.VolumeGroupReplicationNameAnnotation); err != nil { logger.Error(err, "Failed to remove VolumeReplication annotation from PersistentVolumeClaim") return reconcile.Result{}, err } - if err = r.removeFinalizerFromPVC(logger, pvc); err != nil { + if err = RemoveFinalizerFromPVC(r.Client, logger, pvc, pvcReplicationFinalizer); err != nil { logger.Error(err, "Failed to remove PersistentVolumeClaim finalizer") return reconcile.Result{}, err @@ -269,13 +247,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re // enable replication only if its not primary if err = r.enableReplication(vr); err != nil { logger.Error(err, "failed to enable replication") - setFailureCondition(instance) - msg := replication.GetMessageFromError(err) - uErr := r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), msg) - if uErr != nil { - logger.Error(uErr, "failed to update volumeReplication status", "VRName", instance.Name) - } - + err := r.setReplicationFailure(instance, logger, err, true) return reconcile.Result{}, err } replicationErr = r.markVolumeAsPrimary(vr) @@ -291,16 +263,13 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re logger.Info("volume is not ready to use") // set the status.State to secondary as the // instance.Status.State is primary for the first time. - err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), "volume is marked secondary and is degraded") - if err != nil { - return ctrl.Result{}, err - } - + errMsg := fmt.Errorf("volume is marked secondary and is degraded") + err := r.setReplicationFailure(instance, logger, errMsg, false) return ctrl.Result{ Requeue: true, // Setting Requeue time for 15 seconds RequeueAfter: time.Second * 15, - }, nil + }, err } } else { replicationErr = r.markVolumeAsSecondary(vr) @@ -318,41 +287,29 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re default: replicationErr = fmt.Errorf("unsupported volume state") logger.Error(replicationErr, "given volume state is not supported", "ReplicationState", instance.Spec.ReplicationState) - setFailureCondition(instance) - err = r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), replicationErr.Error()) - if err != nil { - logger.Error(err, "failed to update volumeReplication status", "VRName", instance.Name) - } - - return ctrl.Result{}, nil + err := r.setReplicationFailure(instance, logger, replicationErr, true) + return reconcile.Result{}, err } if replicationErr != nil { - msg := replication.GetMessageFromError(replicationErr) logger.Error(replicationErr, "failed to Replicate", "ReplicationState", instance.Spec.ReplicationState) - err = r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), msg) - if err != nil { - logger.Error(err, "failed to update volumeReplication status", "VRName", instance.Name) - } + _ = r.setReplicationFailure(instance, logger, replicationErr, false) if instance.Status.State == replicationv1alpha1.SecondaryState { return ctrl.Result{ Requeue: true, // in case of any error during secondary state, requeue for every 15 seconds. RequeueAfter: time.Second * 15, - }, nil + }, err } - return ctrl.Result{}, replicationErr + return reconcile.Result{}, replicationErr } if requeueForResync { logger.Info("volume is not ready to use, requeuing for resync") - - err = r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), "volume is degraded") - if err != nil { - logger.Error(err, "failed to update volumeReplication status", "VRName", instance.Name) - } + msg := fmt.Errorf("volume is degraded") + err := r.setReplicationFailure(instance, logger, msg, false) return ctrl.Result{ Requeue: true, @@ -360,7 +317,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re // and having default Requeue exponential backoff time can affect // the RTO time. RequeueAfter: time.Second * 30, - }, nil + }, err } var msg string @@ -402,7 +359,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re if instance.Spec.ReplicationState == replicationv1alpha1.Secondary { instance.Status.LastSyncTime = nil } - err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), msg) + err = r.setReplicationFailure(instance, logger, fmt.Errorf(msg), false) if err != nil { return ctrl.Result{}, err } @@ -476,17 +433,18 @@ func (r *VolumeReplicationReconciler) getReplicationClient(ctx context.Context, } -func (r *VolumeReplicationReconciler) updateReplicationStatus( +func (r *VolumeReplicationReconciler) setReplicationFailure( instance *replicationv1alpha1.VolumeReplication, - logger logr.Logger, - state replicationv1alpha1.State, - message string) error { - instance.Status.State = state - instance.Status.Message = message + logger logr.Logger, err error, updateCondtion bool) error { + + if updateCondtion { + SetFailureCondition(instance.Spec.ReplicationState, &instance.Status, instance.Generation) + } + instance.Status.State = GetCurrentReplicationState(instance.Status.State) + instance.Status.Message = replication.GetMessageFromError(err) instance.Status.ObservedGeneration = instance.Generation if err := r.Client.Status().Update(context.TODO(), instance); err != nil { - logger.Error(err, "failed to update status") - + logger.Error(err, "failed to update volumeReplication status", "VRName", instance.Name) return err } @@ -497,7 +455,6 @@ func (r *VolumeReplicationReconciler) updateReplicationStatus( func (r *VolumeReplicationReconciler) SetupWithManager(mgr ctrl.Manager, ctrlOptions controller.Options) error { err := r.waitForCrds() if err != nil { - return err } @@ -513,46 +470,21 @@ func (r *VolumeReplicationReconciler) SetupWithManager(mgr ctrl.Manager, ctrlOpt func (r *VolumeReplicationReconciler) waitForCrds() error { logger := log.FromContext(context.TODO(), "Name", "checkingDependencies") - err := r.waitForVolumeReplicationResource(logger, volumeReplicationClass) + err := WaitForVolumeReplicationResource(r.Client, logger, volumeReplicationClass) if err != nil { logger.Error(err, "failed to wait for VolumeReplicationClass CRD") - return err } - err = r.waitForVolumeReplicationResource(logger, volumeReplication) + err = WaitForVolumeReplicationResource(r.Client, logger, volumeReplication) if err != nil { logger.Error(err, "failed to wait for VolumeReplication CRD") - return err } return nil } -func (r *VolumeReplicationReconciler) waitForVolumeReplicationResource(logger logr.Logger, resourceName string) error { - unstructuredResource := &unstructured.UnstructuredList{} - unstructuredResource.SetGroupVersionKind(schema.GroupVersionKind{ - Group: replicationv1alpha1.GroupVersion.Group, - Kind: resourceName, - Version: replicationv1alpha1.GroupVersion.Version, - }) - for { - err := r.Client.List(context.TODO(), unstructuredResource) - if err == nil { - return nil - } - // return errors other than NoMatch - if !meta.IsNoMatchError(err) { - logger.Error(err, "got an unexpected error while waiting for resource", "Resource", resourceName) - - return err - } - logger.Info("resource does not exist", "Resource", resourceName) - time.Sleep(5 * time.Second) - } -} - // volumeReplicationInstance contains the attributes // that can be useful in reconciling a particular // instance of the VolumeReplication resource. @@ -721,38 +653,6 @@ func (r *VolumeReplicationReconciler) getVolumeReplicationInfo(vr *volumeReplica return infoResponse, nil } -func getReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State { - switch instance.Spec.ReplicationState { - case replicationv1alpha1.Primary: - return replicationv1alpha1.PrimaryState - case replicationv1alpha1.Secondary: - return replicationv1alpha1.SecondaryState - case replicationv1alpha1.Resync: - return replicationv1alpha1.SecondaryState - } - - return replicationv1alpha1.UnknownState -} - -func getCurrentReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State { - if instance.Status.State == "" { - return replicationv1alpha1.UnknownState - } - - return instance.Status.State -} - -func setFailureCondition(instance *replicationv1alpha1.VolumeReplication) { - switch instance.Spec.ReplicationState { - case replicationv1alpha1.Primary: - setFailedPromotionCondition(&instance.Status.Conditions, instance.Generation) - case replicationv1alpha1.Secondary: - setFailedDemotionCondition(&instance.Status.Conditions, instance.Generation) - case replicationv1alpha1.Resync: - setFailedResyncCondition(&instance.Status.Conditions, instance.Generation) - } -} - func getCurrentTime() *metav1.Time { metav1NowTime := metav1.NewTime(time.Now())