Skip to content

Commit

Permalink
groupreplication: add controller logic for volume group replication
Browse files Browse the repository at this point in the history
added controller logic for volume group replication

Signed-off-by: Nikhil-Ladha <nikhilladha1999@gmail.com>
  • Loading branch information
Nikhil-Ladha committed Jul 8, 2024
1 parent 9e26374 commit 48fed15
Show file tree
Hide file tree
Showing 13 changed files with 1,249 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
VolumeGroupReplicationNameAnnotation = "replication.storage.openshift.io/volume-group-replication-name"
)

// VolumeGroupReplicationSpec defines the desired state of VolumeGroupReplication
type VolumeGroupReplicationSpec struct {
// volumeGroupReplicationClassName is the volumeGroupReplicationClass name for this VolumeGroupReplication resource
// +kubebuilder:validation:Required
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumeGroupReplicationClassName is immutable"
VolumeGroupReplicationClassName string `json:"volumeGroupReplicationClassName"`

// volumeReplicationClassName is the volumeReplicationClass name for VolumeReplication object
// volumeReplicationClassName is the volumeReplicationClass name for the VolumeReplication object
// created for this volumeGroupReplication
// +kubebuilder:validation:Required
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumReplicationClassName is immutable"
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="volumeReplicationClassName is immutable"
VolumeReplicationClassName string `json:"volumeReplicationClassName"`

// Name of the VolumeReplication object created for this volumeGroupReplication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ type VolumeGroupReplicationContentSpec struct {
// +optional
VolumeGroupReplicationClassName string `json:"volumeGroupReplicationClassName"`

// Source specifies whether the snapshot is (or should be) dynamically provisioned
// Source specifies whether the volume is (or should be) dynamically provisioned
// or already exists, and just requires a Kubernetes object representation.
// This field is immutable after creation.
// Required.
Source VolumeGroupReplicationContentSource `json:"source"`
}
Expand All @@ -68,7 +67,7 @@ type VolumeGroupReplicationContentSource struct {

// VolumeGroupReplicationContentStatus defines the status of VolumeGroupReplicationContent
type VolumeGroupReplicationContentStatus struct {
// PersistentVolumeRefList is the list of of PV for the group replication
// PersistentVolumeRefList is the list of PV for the group replication
// The maximum number of allowed PV in the group is 100.
// +optional
PersistentVolumeRefList []corev1.LocalObjectReference `json:"persistentVolumeRefList,omitempty"`
Expand Down
6 changes: 4 additions & 2 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,10 @@ func main() {
os.Exit(1)
}
if err = (&replicationController.VolumeGroupReplicationContentReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Connpool: connPool,
Timeout: defaultTimeout,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VolumeGroupReplicationContent")
os.Exit(1)
Expand Down
97 changes: 82 additions & 15 deletions controllers/replication.storage/finalizers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
volumeReplicationFinalizer = "replication.storage.openshift.io"
pvcReplicationFinalizer = "replication.storage.openshift.io/pvc-protection"
vgrReplicationFinalizer = "replication.storage.openshift.io/vgr-protection"
)

// addFinalizerToVR adds the VR finalizer on the VolumeReplication instance.
Expand Down Expand Up @@ -64,31 +66,96 @@ func (r *VolumeReplicationReconciler) removeFinalizerFromVR(logger logr.Logger,
return nil
}

// addFinalizerToPVC adds the VR finalizer on the PersistentVolumeClaim.
func (r *VolumeReplicationReconciler) addFinalizerToPVC(logger logr.Logger, pvc *corev1.PersistentVolumeClaim) error {
if !slices.Contains(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) {
logger.Info("adding finalizer to PersistentVolumeClaim object", "Finalizer", pvcReplicationFinalizer)
pvc.ObjectMeta.Finalizers = append(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer)
if err := r.Client.Update(context.TODO(), pvc); err != nil {
// AddFinalizerToPVC adds the VR finalizer on the PersistentVolumeClaim.
func AddFinalizerToPVC(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim,
replicationFinalizer string) error {
if !slices.Contains(pvc.ObjectMeta.Finalizers, replicationFinalizer) {
logger.Info("adding finalizer to PersistentVolumeClaim object", "Finalizer", replicationFinalizer)
pvc.ObjectMeta.Finalizers = append(pvc.ObjectMeta.Finalizers, replicationFinalizer)
if err := client.Update(context.TODO(), pvc); err != nil {
return fmt.Errorf("failed to add finalizer (%s) to PersistentVolumeClaim resource"+
" (%s/%s) %w",
pvcReplicationFinalizer, pvc.Namespace, pvc.Name, err)
replicationFinalizer, pvc.Namespace, pvc.Name, err)
}
}

return nil
}

// removeFinalizerFromPVC removes the VR finalizer on PersistentVolumeClaim.
func (r *VolumeReplicationReconciler) removeFinalizerFromPVC(logger logr.Logger, pvc *corev1.PersistentVolumeClaim,
) error {
if slices.Contains(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer) {
logger.Info("removing finalizer from PersistentVolumeClaim object", "Finalizer", pvcReplicationFinalizer)
pvc.ObjectMeta.Finalizers = util.RemoveFromSlice(pvc.ObjectMeta.Finalizers, pvcReplicationFinalizer)
if err := r.Client.Update(context.TODO(), pvc); err != nil {
// RemoveFinalizerFromPVC removes the VR finalizer on PersistentVolumeClaim.
func RemoveFinalizerFromPVC(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim,
replicationFinalizer string) error {
if slices.Contains(pvc.ObjectMeta.Finalizers, replicationFinalizer) {
logger.Info("removing finalizer from PersistentVolumeClaim object", "Finalizer", replicationFinalizer)
pvc.ObjectMeta.Finalizers = util.RemoveFromSlice(pvc.ObjectMeta.Finalizers, replicationFinalizer)
if err := client.Update(context.TODO(), pvc); err != nil {
return fmt.Errorf("failed to remove finalizer (%s) from PersistentVolumeClaim resource"+
" (%s/%s), %w",
pvcReplicationFinalizer, pvc.Namespace, pvc.Name, err)
replicationFinalizer, pvc.Namespace, pvc.Name, err)
}
}

return nil
}

// AddFinalizerToVGR adds the VGR finalizer on the VolumeGroupReplication resource
func AddFinalizerToVGR(client client.Client, logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication) error {
if !slices.Contains(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) {
logger.Info("adding finalizer to volumeGroupReplication object", "Finalizer", vgrReplicationFinalizer)
vgr.ObjectMeta.Finalizers = append(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer)
if err := client.Update(context.TODO(), vgr); err != nil {
return fmt.Errorf("failed to add finalizer (%s) to VolumeGroupReplication resource"+
" (%s/%s) %w",
vgrReplicationFinalizer, vgr.Namespace, vgr.Name, err)
}
}

return nil
}

// RemoveFinalizerFromVGR removes the VR finalizer from the VolumeGroupReplication instance.
func RemoveFinalizerFromVGR(client client.Client, logger logr.Logger, vgr *replicationv1alpha1.VolumeGroupReplication) error {
if slices.Contains(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer) {
logger.Info("removing finalizer from volumeGroupReplication object", "Finalizer", vgrReplicationFinalizer)
vgr.ObjectMeta.Finalizers = util.RemoveFromSlice(vgr.ObjectMeta.Finalizers, vgrReplicationFinalizer)
if err := client.Update(context.TODO(), vgr); err != nil {
return fmt.Errorf("failed to remove finalizer (%s) from VolumeGroupReplication resource"+
" (%s/%s), %w",
vgrReplicationFinalizer, vgr.Namespace, vgr.Name, err)
}
}

return nil
}

// addFinalizerToVGRContent adds the VGR finalizer on the VolumeGroupReplicationContent resource
func (r *VolumeGroupReplicationContentReconciler) addFinalizerToVGRContent(logger logr.Logger,
vgrContent *replicationv1alpha1.VolumeGroupReplicationContent) error {

if !slices.Contains(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) {
logger.Info("adding finalizer to volumeGroupReplicationContent object", "Finalizer", vgrReplicationFinalizer)
vgrContent.ObjectMeta.Finalizers = append(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer)
if err := r.Client.Update(context.TODO(), vgrContent); err != nil {
return fmt.Errorf("failed to add finalizer (%s) to VolumeGroupReplicationContent resource"+
" (%s/%s) %w",
vgrReplicationFinalizer, vgrContent.Namespace, vgrContent.Name, err)
}
}

return nil
}

// removeFinalizerFromVR removes the VR finalizer from the VolumeReplication instance.
func (r *VolumeGroupReplicationContentReconciler) removeFinalizerFromVGRContent(logger logr.Logger,
vgrContent *replicationv1alpha1.VolumeGroupReplicationContent) error {

if slices.Contains(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer) {
logger.Info("removing finalizer from volumeGroupReplicationContent object", "Finalizer", vgrReplicationFinalizer)
vgrContent.ObjectMeta.Finalizers = util.RemoveFromSlice(vgrContent.ObjectMeta.Finalizers, vgrReplicationFinalizer)
if err := r.Client.Update(context.TODO(), vgrContent); err != nil {
return fmt.Errorf("failed to remove finalizer (%s) from VolumeGroupReplicationContent resource"+
" (%s/%s), %w",
vgrReplicationFinalizer, vgrContent.Namespace, vgrContent.Name, err)
}
}

Expand Down
39 changes: 23 additions & 16 deletions controllers/replication.storage/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
)
Expand Down Expand Up @@ -59,22 +60,27 @@ func (r VolumeReplicationReconciler) getPVCDataSource(logger logr.Logger, req ty
return pvc, pv, nil
}

// annotatePVCWithOwner will add the VolumeReplication details to the PVC annotations.
func (r *VolumeReplicationReconciler) annotatePVCWithOwner(ctx context.Context, logger logr.Logger, reqOwnerName string, pvc *corev1.PersistentVolumeClaim) error {
// AnnotatePVCWithOwner will add the VolumeReplication/VolumeGroupReplication details to the PVC annotations.
func AnnotatePVCWithOwner(client client.Client, logger logr.Logger, reqOwnerName string,
pvc *corev1.PersistentVolumeClaim, pvcAnnotation string) error {
if pvc.ObjectMeta.Annotations == nil {
pvc.ObjectMeta.Annotations = map[string]string{}
}

currentOwnerName := pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation]
if pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] != "" &&
pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeGroupReplicationNameAnnotation] != "" {
logger.Info("PVC can't be part of both VolumeGroupReplication and VolumeReplication")
return fmt.Errorf("PVC %q can't be owned by both VolumeReplication and VolumeGroupReplication", pvc.Name)
}

currentOwnerName := pvc.ObjectMeta.Annotations[pvcAnnotation]
if currentOwnerName == "" {
logger.Info("setting owner on PVC annotation", "Name", pvc.Name, "owner", reqOwnerName)
pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation] = reqOwnerName
err := r.Update(ctx, pvc)
pvc.ObjectMeta.Annotations[pvcAnnotation] = reqOwnerName
err := client.Update(context.TODO(), pvc)
if err != nil {
logger.Error(err, "Failed to update PVC annotation", "Name", pvc.Name)

return fmt.Errorf("failed to update PVC %q annotation for VolumeReplication: %w",
pvc.Name, err)
return fmt.Errorf("failed to update PVC %q annotation for replication: %w", pvc.Name, err)
}

return nil
Expand All @@ -86,22 +92,23 @@ func (r *VolumeReplicationReconciler) annotatePVCWithOwner(ctx context.Context,
"current owner", currentOwnerName,
"requested owner", reqOwnerName)

return fmt.Errorf("PVC %q not owned by VolumeReplication %q",
return fmt.Errorf("PVC %q not owned by correct VolumeReplication/VolumeGroupReplication %q",
pvc.Name, reqOwnerName)
}

return nil
}

// removeOwnerFromPVCAnnotation removes the VolumeReplication owner from the PVC annotations.
func (r *VolumeReplicationReconciler) removeOwnerFromPVCAnnotation(ctx context.Context, logger logr.Logger, pvc *corev1.PersistentVolumeClaim) error {
if _, ok := pvc.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation]; ok {
logger.Info("removing owner annotation from PersistentVolumeClaim object", "Annotation", replicationv1alpha1.VolumeReplicationNameAnnotation)
delete(pvc.ObjectMeta.Annotations, replicationv1alpha1.VolumeReplicationNameAnnotation)
if err := r.Client.Update(ctx, pvc); err != nil {
// RemoveOwnerFromPVCAnnotation removes the VolumeReplication/VolumeGroupReplication owner from the PVC annotations.
func RemoveOwnerFromPVCAnnotation(client client.Client, logger logr.Logger, pvc *corev1.PersistentVolumeClaim,
pvcAnnotation string) error {
if _, ok := pvc.ObjectMeta.Annotations[pvcAnnotation]; ok {
logger.Info("removing owner annotation from PersistentVolumeClaim object", "Annotation", pvcAnnotation)
delete(pvc.ObjectMeta.Annotations, pvcAnnotation)
if err := client.Update(context.TODO(), pvc); err != nil {
return fmt.Errorf("failed to remove annotation %q from PersistentVolumeClaim "+
"%q %w",
replicationv1alpha1.VolumeReplicationNameAnnotation, pvc.Name, err)
pvcAnnotation, pvc.Name, err)
}
}

Expand Down
8 changes: 4 additions & 4 deletions controllers/replication.storage/pvc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) {

ctx := context.TODO()
reconciler := createFakeVolumeReplicationReconciler(t, testPVC, volumeReplication)
err := reconciler.annotatePVCWithOwner(ctx, log.FromContext(context.TODO()), vrName, testPVC)
err := AnnotatePVCWithOwner(reconciler.Client, log.FromContext(context.TODO()), vrName, testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation)
if tc.errorExpected {
assert.Error(t, err)
} else {
Expand All @@ -244,11 +244,11 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) {
assert.Equal(t, testPVC.ObjectMeta.Annotations[replicationv1alpha1.VolumeReplicationNameAnnotation], vrName)
}

err = reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), testPVC)
err = RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation)
assert.NoError(t, err)

// try calling delete again, it should not fail
err = reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), testPVC)
err = RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), testPVC, replicationv1alpha1.VolumeReplicationNameAnnotation)
assert.NoError(t, err)

}
Expand All @@ -262,6 +262,6 @@ func TestVolumeReplicationReconciler_annotatePVCWithOwner(t *testing.T) {
}
volumeReplication := &replicationv1alpha1.VolumeReplication{}
reconciler := createFakeVolumeReplicationReconciler(t, pvc, volumeReplication)
err := reconciler.removeOwnerFromPVCAnnotation(context.TODO(), log.FromContext(context.TODO()), pvc)
err := RemoveOwnerFromPVCAnnotation(reconciler.Client, log.FromContext(context.TODO()), pvc, replicationv1alpha1.VolumeReplicationNameAnnotation)
assert.NoError(t, err)
}
85 changes: 85 additions & 0 deletions controllers/replication.storage/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2024 The Kubernetes-CSI-Addons Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"time"

replicationv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/apis/replication.storage/v1alpha1"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func SetFailureCondition(instanceState replicationv1alpha1.ReplicationState,
instanceStatus *replicationv1alpha1.VolumeReplicationStatus, instanceGeneration int64) {

switch instanceState {
case replicationv1alpha1.Primary:
setFailedPromotionCondition(&instanceStatus.Conditions, instanceGeneration)
case replicationv1alpha1.Secondary:
setFailedDemotionCondition(&instanceStatus.Conditions, instanceGeneration)
case replicationv1alpha1.Resync:
setFailedResyncCondition(&instanceStatus.Conditions, instanceGeneration)
}
}

func GetReplicationState(instanceState replicationv1alpha1.ReplicationState) replicationv1alpha1.State {
switch instanceState {
case replicationv1alpha1.Primary:
return replicationv1alpha1.PrimaryState
case replicationv1alpha1.Secondary:
return replicationv1alpha1.SecondaryState
case replicationv1alpha1.Resync:
return replicationv1alpha1.SecondaryState
}

return replicationv1alpha1.UnknownState
}

func GetCurrentReplicationState(instanceStatusState replicationv1alpha1.State) replicationv1alpha1.State {
if instanceStatusState == "" {
return replicationv1alpha1.UnknownState
}

return instanceStatusState
}

func WaitForVolumeReplicationResource(client client.Client, logger logr.Logger, resourceName string) error {
unstructuredResource := &unstructured.UnstructuredList{}
unstructuredResource.SetGroupVersionKind(schema.GroupVersionKind{
Group: replicationv1alpha1.GroupVersion.Group,
Kind: resourceName,
Version: replicationv1alpha1.GroupVersion.Version,
})
for {
err := client.List(context.TODO(), unstructuredResource)
if err == nil {
return nil
}
// return errors other than NoMatch
if !meta.IsNoMatchError(err) {
logger.Error(err, "got an unexpected error while waiting for resource", "Resource", resourceName)
return err
}
logger.Info("resource does not exist", "Resource", resourceName)
time.Sleep(5 * time.Second)
}
}
Loading

0 comments on commit 48fed15

Please sign in to comment.