From 7dbb200657f7e61360e1e075cef0fc4f047445a7 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Fri, 19 Jul 2024 10:40:21 +0200 Subject: [PATCH] rbd: refactor the replication code added interface to refactor mirroring to work for both volume and group Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 269 +++++++++----------- internal/csi-addons/rbd/replication_test.go | 183 +++++++------ internal/rbd/controllerserver.go | 19 +- internal/rbd/mirror.go | 171 +++++++++---- internal/rbd/rbd_util.go | 8 +- internal/rbd/replication.go | 41 ++- internal/rbd/types/group.go | 2 + internal/rbd/types/mirror.go | 77 ++++++ internal/rbd/types/volume.go | 21 ++ 9 files changed, 483 insertions(+), 308 deletions(-) create mode 100644 internal/rbd/types/mirror.go diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 544907b4f497..d4e8ee17579d 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -27,7 +27,9 @@ import ( "time" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd" + "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -124,18 +126,18 @@ func getForceOption(ctx context.Context, parameters map[string]string) (bool, er // getFlattenMode gets flatten mode from the input GRPC request parameters. // flattenMode is the key to check the mode in the parameters. -func getFlattenMode(ctx context.Context, parameters map[string]string) (corerbd.FlattenMode, error) { +func getFlattenMode(ctx context.Context, parameters map[string]string) (types.FlattenMode, error) { val, ok := parameters[flattenModeKey] if !ok { log.DebugLog(ctx, "%q is not set in parameters, setting to default (%v)", - flattenModeKey, corerbd.FlattenModeNever) + flattenModeKey, types.FlattenModeNever) - return corerbd.FlattenModeNever, nil + return types.FlattenModeNever, nil } - mode := corerbd.FlattenMode(val) + mode := types.FlattenMode(val) switch mode { - case corerbd.FlattenModeForce, corerbd.FlattenModeNever: + case types.FlattenModeForce, types.FlattenModeNever: return mode, nil } log.ErrorLog(ctx, "%q=%q is not supported", flattenModeKey, val) @@ -270,24 +272,26 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + var mirror types.Mirror + var rbdVol types.Volume + + rbdVol, err = mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror = rbdVol + // extract the mirroring mode mirroringMode, err := getMirroringMode(ctx, req.GetParameters()) if err != nil { @@ -299,21 +303,20 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { err = rbdVol.HandleParentImageExistence(ctx, flattenMode) if err != nil { log.ErrorLog(ctx, err.Error()) return nil, getGRPCError(err) } - err = rbdVol.EnableImageMirroring(mirroringMode) + err = mirror.EnableMirroring(mirroringMode) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -347,52 +350,52 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() - if err != nil { + mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + var mirror types.Mirror + + rbdVol, rErr := mgr.GetVolumeByID(ctx, volumeID) + if rErr != nil { switch { - case errors.Is(err, corerbd.ErrImageNotFound): + case errors.Is(rErr, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) - case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + case errors.Is(rErr, util.ErrPoolNotFound): + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: - err = status.Errorf(codes.Internal, err.Error()) + err = status.Errorf(codes.Internal, rErr.Error()) } return nil, err } + mirror = rbdVol + // extract the force option force, err := getForceOption(ctx, req.GetParameters()) if err != nil { return nil, err } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - - switch mirroringInfo.State { + switch info.GetState() { // image is already in disabled state - case librbd.MirrorImageDisabled: + case librbd.MirrorImageDisabled.String(): // image mirroring is still disabling - case librbd.MirrorImageDisabling: + case librbd.MirrorImageDisabling.String(): return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) - case librbd.MirrorImageEnabled: - err = rbdVol.DisableVolumeReplication(mirroringInfo, force) + case librbd.MirrorImageEnabled.String(): + err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force) if err != nil { return nil, getGRPCError(err) } return &replication.DisableVolumeReplicationResponse{}, nil default: - return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State) + return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", info.GetState()) } return &replication.DisableVolumeReplicationResponse{}, nil @@ -422,48 +425,49 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + var mirror types.Mirror + var rbdVol types.Volume + + rbdVol, err = mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror = rbdVol - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Errorf( codes.InvalidArgument, - "mirroring is not enabled on %s, image is in %d Mode", - rbdVol.VolID, - mirroringInfo.State) + "mirroring is not enabled on %s, image is in %s Mode", + volumeID, + info.GetState()) } // promote secondary to primary - if !mirroringInfo.Primary { + if !info.IsPrimary() { if req.GetForce() { // workaround for https://github.com/ceph/ceph-csi/issues/2736 // TODO: remove this workaround when the issue is fixed - err = rbdVol.ForcePromoteImage(cr) + err = mirror.ForcePromote(cr) } else { - err = rbdVol.PromoteImage(req.GetForce()) + err = mirror.Promote(req.GetForce()) } if err != nil { log.ErrorLog(ctx, err.Error()) @@ -483,7 +487,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, interval, startTime := getSchedulingDetails(req.GetParameters()) if interval != admin.NoInterval { - err = rbdVol.AddSnapshotScheduling(interval, startTime) + err = mirror.AddSnapshotScheduling(interval, startTime) if err != nil { return nil, err } @@ -522,49 +526,49 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + var mirror types.Mirror + var rbdVol types.Volume + + rbdVol, err = mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } - - creationTime, err := rbdVol.GetImageCreationTime() + mirror = rbdVol + creationTime, err := rbdVol.GetCreationTime() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Internal, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Errorf( codes.InvalidArgument, - "mirroring is not enabled on %s, image is in %d Mode", - rbdVol.VolID, - mirroringInfo.State) + "mirroring is not enabled on %s, image is in %s Mode", + volumeID, + info.GetState()) } // demote image to secondary - if mirroringInfo.Primary { + if info.IsPrimary() { // store the image creation time for resync _, err = rbdVol.GetMetadata(imageCreationTimeKey) if err != nil && errors.Is(err, librbd.ErrNotFound) { @@ -577,7 +581,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - err = rbdVol.DemoteImage() + err = mirror.Demote() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -590,22 +594,22 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, // checkRemoteSiteStatus checks the state of the remote cluster. // It returns true if the state of the remote cluster is up and unknown. -func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirrorImageStatus) bool { +func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus) bool { ready := true found := false - for _, s := range mirrorStatus.SiteStatuses { + for _, s := range mirrorStatus { log.UsefulLog( ctx, "peer site mirrorUUID=%q, daemon up=%t, mirroring state=%q, description=%q and lastUpdate=%d", - s.MirrorUUID, - s.Up, - s.State, - s.Description, - s.LastUpdate) - if s.MirrorUUID != "" { + s.GetMirrorUUID(), + s.IsUP(), + s.GetState(), + s.GetDescription(), + s.GetLastUpdate()) + if s.GetMirrorUUID() != "" { found = true // If ready is already "false" do not flip it based on another remote peer status - if ready && (s.State != librbd.MirrorImageStatusStateUnknown || !s.Up) { + if ready && (s.GetState() != librbd.MirrorImageStatusStateUnknown.String() || !s.IsUP()) { ready = false } } @@ -639,26 +643,27 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() + mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets()) + defer mgr.Destroy(ctx) + var mirror types.Mirror + var rbdVol types.Volume + + rbdVol, err = mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: err = status.Errorf(codes.Internal, err.Error()) } return nil, err } + mirror = rbdVol - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { // in case of Resync the image will get deleted and gets recreated and // it takes time for this operation. @@ -667,22 +672,22 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Errorf(codes.Aborted, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled") } // return error if the image is still primary - if mirroringInfo.Primary { + if info.IsPrimary() { return nil, status.Error(codes.InvalidArgument, "image is in primary state") } - mirrorStatus, err := rbdVol.GetImageMirroringStatus() + sts, err := mirror.GetGlobalMirroringStatus() if err != nil { // the image gets recreated after issuing resync if errors.Is(err, corerbd.ErrImageNotFound) { // caller retries till RBD syncs an initial version of the image to // report its status in the resync call. Ideally, this line will not - // be executed as the error would get returned due to getImageMirroringInfo + // be executed as the error would get returned due to getMirroringInfo // failing to find an image above. return nil, status.Error(codes.Aborted, err.Error()) } @@ -692,22 +697,20 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } ready := false - localStatus, err := mirrorStatus.LocalStatus() + localStatus, err := sts.GetLocalSiteStatus() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, fmt.Errorf("failed to get local status: %w", err) } - // convert the last update time to UTC - lastUpdateTime := time.Unix(localStatus.LastUpdate, 0).UTC() log.UsefulLog( ctx, "local status: daemon up=%t, image mirroring state=%q, description=%q and lastUpdate=%s", - localStatus.Up, - localStatus.State, - localStatus.Description, - lastUpdateTime) + localStatus.IsUP(), + localStatus.GetState(), + localStatus.GetDescription(), + localStatus.GetLastUpdate()) // To recover from split brain (up+error) state the image need to be // demoted and requested for resync on site-a and then the image on site-b @@ -719,11 +722,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, // If the image state on both the sites are up+unknown consider that // complete data is synced as the last snapshot // gets exchanged between the clusters. - if localStatus.State == librbd.MirrorImageStatusStateUnknown && localStatus.Up { - ready = checkRemoteSiteStatus(ctx, mirrorStatus) + if localStatus.GetState() == librbd.MirrorImageStatusStateUnknown.String() && localStatus.IsUP() { + ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus()) } - creationTime, err := rbdVol.GetImageCreationTime() + creationTime, err := rbdVol.GetCreationTime() if err != nil { return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error()) } @@ -749,7 +752,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime()) if req.GetForce() && st.Equal(creationTime.AsTime()) { - err = rbdVol.ResyncVol(localStatus) + err = mirror.Resync() if err != nil { return nil, getGRPCError(err) } @@ -853,42 +856,42 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) - defer func() { - if rbdVol != nil { - rbdVol.Destroy(ctx) - } - }() - if err != nil { + mgr := rbd.NewManager(map[string]string{}, req.GetSecrets()) + defer mgr.Destroy(ctx) + var mirror types.Mirror + + rbdVol, rErr := mgr.GetVolumeByID(ctx, volumeID) + if rErr != nil { switch { - case errors.Is(err, corerbd.ErrImageNotFound): + case errors.Is(rErr, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) - case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) + case errors.Is(rErr, util.ErrPoolNotFound): + err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.GetPoolName(), volumeID) default: - err = status.Errorf(codes.Internal, err.Error()) + err = status.Errorf(codes.Internal, rErr.Error()) } return nil, err } + mirror = rbdVol - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Error(codes.Aborted, err.Error()) } - if mirroringInfo.State != librbd.MirrorImageEnabled { + if info.GetState() != librbd.MirrorImageEnabled.String() { return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled") } // return error if the image is not in primary state - if !mirroringInfo.Primary { + if !info.IsPrimary() { return nil, status.Error(codes.InvalidArgument, "image is not in primary state") } - mirrorStatus, err := rbdVol.GetImageMirroringStatus() + mirrorStatus, err := mirror.GetGlobalMirroringStatus() if err != nil { if errors.Is(err, corerbd.ErrImageNotFound) { return nil, status.Error(codes.Aborted, err.Error()) @@ -898,14 +901,14 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - remoteStatus, err := RemoteStatus(ctx, mirrorStatus) + remoteStatus, err := mirrorStatus.GetRemoteSiteStatus(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) return nil, status.Errorf(codes.Internal, "failed to get remote status: %v", err) } - description := remoteStatus.Description + description := remoteStatus.GetDescription() resp, err := getLastSyncInfo(ctx, description) if err != nil { if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) { @@ -919,36 +922,6 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return resp, nil } -// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses -// slice that corresponds to the remote site's status. If the remote status -// is not found than the error ErrNotExist will be returned. -func RemoteStatus(ctx context.Context, gmis *librbd.GlobalMirrorImageStatus) (librbd.SiteMirrorImageStatus, error) { - var ( - ss librbd.SiteMirrorImageStatus - err error = librbd.ErrNotExist - ) - - for i := range gmis.SiteStatuses { - log.DebugLog( - ctx, - "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", - gmis.SiteStatuses[i].MirrorUUID, - gmis.SiteStatuses[i].State, - gmis.SiteStatuses[i].Description, - gmis.SiteStatuses[i].LastUpdate, - gmis.SiteStatuses[i].Up) - - if gmis.SiteStatuses[i].MirrorUUID != "" { - ss = gmis.SiteStatuses[i] - err = nil - - break - } - } - - return ss, err -} - // This function gets the local snapshot time, last sync snapshot seconds // and last sync bytes from the description of localStatus and convert // it into required types. @@ -1015,12 +988,12 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV return &response, nil } -func checkVolumeResyncStatus(ctx context.Context, localStatus librbd.SiteMirrorImageStatus) error { +func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) error { // we are considering local snapshot timestamp to check if the resync is // started or not, if we dont see local_snapshot_timestamp in the // description of localStatus, we are returning error. if we see the local // snapshot timestamp in the description we return resyncing started. - description := localStatus.Description + description := localStatus.GetDescription() resp, err := getLastSyncInfo(ctx, description) if err != nil { return fmt.Errorf("failed to get last sync info: %w", err) diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index c678c6377236..6da1b929eea7 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -26,6 +26,7 @@ import ( "time" corerbd "github.com/ceph/ceph-csi/internal/rbd" + "github.com/ceph/ceph-csi/internal/rbd/types" librbd "github.com/ceph/go-ceph/rbd" "github.com/ceph/go-ceph/rbd/admin" @@ -219,30 +220,36 @@ func TestCheckVolumeResyncStatus(t *testing.T) { t.Parallel() tests := []struct { name string - args librbd.SiteMirrorImageStatus + args corerbd.SiteMirrorImageStatus wantErr bool }{ { name: "test when local_snapshot_timestamp is non zero", - args: librbd.SiteMirrorImageStatus{ - //nolint:lll // sample output cannot be split into multiple lines. - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + //nolint:lll // sample output cannot be split into multiple lines. + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: false, }, { name: "test when local_snapshot_timestamp is zero", //nolint:lll // sample output cannot be split into multiple lines. - args: librbd.SiteMirrorImageStatus{ - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: true, }, { name: "test when local_snapshot_timestamp is not present", //nolint:lll // sample output cannot be split into multiple lines. - args: librbd.SiteMirrorImageStatus{ - Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + args: corerbd.SiteMirrorImageStatus{ + SiteMirrorImageStatus: librbd.SiteMirrorImageStatus{ + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + }, }, wantErr: true, }, @@ -261,17 +268,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { t.Parallel() tests := []struct { name string - args *librbd.GlobalMirrorImageStatus + args corerbd.GlobalMirrorStatus wantReady bool }{ { name: "Test a single peer in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -279,17 +288,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test a single peer in sync, including a local instance", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -297,17 +308,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test a multiple peers in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -315,19 +328,23 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test no remote peers", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{}, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{}, + }, }, wantReady: false, }, { name: "Test single peer not in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateReplaying, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateReplaying, + Up: true, + }, }, }, }, @@ -335,12 +352,14 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test single peer not up", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote", - State: librbd.MirrorImageStatusStateUnknown, - Up: false, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote", + State: librbd.MirrorImageStatusStateUnknown, + Up: false, + }, }, }, }, @@ -348,17 +367,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test multiple peers, when first peer is not in sync", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateStoppingReplay, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateStoppingReplay, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, }, }, }, @@ -366,17 +387,19 @@ func TestCheckRemoteSiteStatus(t *testing.T) { }, { name: "Test multiple peers, when second peer is not up", - args: &librbd.GlobalMirrorImageStatus{ - SiteStatuses: []librbd.SiteMirrorImageStatus{ - { - MirrorUUID: "remote1", - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - { - MirrorUUID: "remote2", - State: librbd.MirrorImageStatusStateUnknown, - Up: false, + args: corerbd.GlobalMirrorStatus{ + GlobalMirrorImageStatus: librbd.GlobalMirrorImageStatus{ + SiteStatuses: []librbd.SiteMirrorImageStatus{ + { + MirrorUUID: "remote1", + State: librbd.MirrorImageStatusStateUnknown, + Up: true, + }, + { + MirrorUUID: "remote2", + State: librbd.MirrorImageStatusStateUnknown, + Up: false, + }, }, }, }, @@ -386,7 +409,7 @@ func TestCheckRemoteSiteStatus(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() - if ready := checkRemoteSiteStatus(context.TODO(), tt.args); ready != tt.wantReady { + if ready := checkRemoteSiteStatus(context.TODO(), tt.args.GetAllSitesStatus()); ready != tt.wantReady { t.Errorf("checkRemoteSiteStatus() ready = %v, expect ready = %v", ready, tt.wantReady) } }) @@ -651,7 +674,7 @@ func Test_getFlattenMode(t *testing.T) { tests := []struct { name string args args - want corerbd.FlattenMode + want types.FlattenMode wantErr bool }{ { @@ -660,27 +683,27 @@ func Test_getFlattenMode(t *testing.T) { ctx: context.TODO(), parameters: map[string]string{}, }, - want: corerbd.FlattenModeNever, + want: types.FlattenModeNever, }, { name: "flattenMode option set to never", args: args{ ctx: context.TODO(), parameters: map[string]string{ - flattenModeKey: string(corerbd.FlattenModeNever), + flattenModeKey: string(types.FlattenModeNever), }, }, - want: corerbd.FlattenModeNever, + want: types.FlattenModeNever, }, { name: "flattenMode option set to force", args: args{ ctx: context.TODO(), parameters: map[string]string{ - flattenModeKey: string(corerbd.FlattenModeForce), + flattenModeKey: string(types.FlattenModeForce), }, }, - want: corerbd.FlattenModeForce, + want: types.FlattenModeForce, }, { diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 1f32e6dec7e6..04aafaca085c 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -981,7 +981,7 @@ func (cs *ControllerServer) DeleteVolume( func cleanupRBDImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials, ) (*csi.DeleteVolumeResponse, error) { - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + info, err := rbdVol.GetMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -991,7 +991,7 @@ func cleanupRBDImage(ctx context.Context, // Mirroring is enabled on the image // Local image is secondary // Local image is in up+replaying state - if mirroringInfo.State == librbd.MirrorImageEnabled && !mirroringInfo.Primary { + if info.GetState() == librbd.MirrorImageEnabled.String() && !info.IsPrimary() { // If the image is in a secondary state and its up+replaying means its // an healthy secondary and the image is primary somewhere in the // remote cluster and the local image is getting replayed. Delete the @@ -1000,11 +1000,18 @@ func cleanupRBDImage(ctx context.Context, // the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the OMAP, PVC and PV // objects after failback operation. - localStatus, rErr := rbdVol.GetLocalState() + sts, rErr := rbdVol.GetGlobalMirroringStatus() if rErr != nil { return nil, status.Error(codes.Internal, rErr.Error()) } - if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying { + + localStatus, rErr := sts.GetLocalSiteStatus() + if rErr != nil { + log.ErrorLog(ctx, "failed to get local status for volume %s: %w", rbdVol.RbdImageName, rErr) + + return nil, status.Error(codes.Internal, rErr.Error()) + } + if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() { if err = undoVolReservation(ctx, rbdVol, cr); err != nil { log.ErrorLog(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)", rbdVol.RequestName, rbdVol.RbdImageName, err) @@ -1016,8 +1023,8 @@ func cleanupRBDImage(ctx context.Context, } log.ErrorLog(ctx, "secondary image status is up=%t and state=%s", - localStatus.Up, - localStatus.State) + localStatus.IsUP(), + localStatus.GetState()) } inUse, err := rbdVol.isInUse() diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 106a9eb77769..6767e8e8781c 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -20,21 +20,13 @@ import ( "fmt" "time" + "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" librbd "github.com/ceph/go-ceph/rbd" ) -// FlattenMode is used to indicate the flatten mode for an RBD image. -type FlattenMode string - -const ( - // FlattenModeNever indicates that the image should never be flattened. - FlattenModeNever FlattenMode = "never" - // FlattenModeForce indicates that the image with the parent must be flattened. - FlattenModeForce FlattenMode = "force" -) - // HandleParentImageExistence checks the image's parent. // if the parent image does not exist and is not in trash, it returns nil. // if the flattenMode is FlattenModeForce, it flattens the image itself. @@ -42,13 +34,12 @@ const ( // if the parent image exists and is not enabled for mirroring, it returns an error. func (rv *rbdVolume) HandleParentImageExistence( ctx context.Context, - flattenMode FlattenMode, + mode types.FlattenMode, ) error { if rv.ParentName == "" && !rv.ParentInTrash { return nil } - - if flattenMode == FlattenModeForce { + if mode == types.FlattenModeForce { // Delete temp image that exists for volume datasource since // it is no longer required when the live image is flattened. err := rv.DeleteTempImage(ctx) @@ -72,14 +63,13 @@ func (rv *rbdVolume) HandleParentImageExistence( if err != nil { return err } - parentMirroringInfo, err := parent.GetImageMirroringInfo() + parentMirroringInfo, err := parent.GetMirroringInfo() if err != nil { return fmt.Errorf( "failed to get mirroring info of parent %q of image %q: %w", parent, rv, err) } - - if parentMirroringInfo.State != librbd.MirrorImageEnabled { + if parentMirroringInfo.GetState() != librbd.MirrorImageEnabled.String() { return fmt.Errorf("%w: failed to enable mirroring on image %q: "+ "parent image %q is not enabled for mirroring", ErrFailedPrecondition, rv, parent) @@ -88,8 +78,11 @@ func (rv *rbdVolume) HandleParentImageExistence( return nil } -// EnableImageMirroring enables mirroring on an image. -func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { +// check that rbdVolume implements the types.Mirror interface. +var _ types.Mirror = &rbdVolume{} + +// EnableMirroring enables mirroring on an image. +func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -104,8 +97,8 @@ func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { return nil } -// DisableImageMirroring disables mirroring on an image. -func (ri *rbdImage) DisableImageMirroring(force bool) error { +// DisableMirroring disables mirroring on an image. +func (ri *rbdImage) DisableMirroring(force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -120,8 +113,8 @@ func (ri *rbdImage) DisableImageMirroring(force bool) error { return nil } -// GetImageMirroringInfo gets mirroring information of an image. -func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) { +// GetMirroringInfo gets mirroring information of an image. +func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -133,11 +126,11 @@ func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) { return nil, fmt.Errorf("failed to get mirroring info of %q with error: %w", ri, err) } - return info, nil + return ImageStatus{MirrorImageInfo: info}, nil } -// PromoteImage promotes image to primary. -func (ri *rbdImage) PromoteImage(force bool) error { +// Promote promotes image to primary. +func (ri *rbdImage) Promote(force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -151,10 +144,10 @@ func (ri *rbdImage) PromoteImage(force bool) error { return nil } -// ForcePromoteImage promotes image to primary with force option with 2 minutes +// ForcePromote promotes image to primary with force option with 2 minutes // timeout. If there is no response within 2 minutes,the rbd CLI process will be // killed and an error is returned. -func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error { +func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { promoteArgs := []string{ "mirror", "image", "promote", rv.String(), @@ -181,8 +174,8 @@ func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error { return nil } -// DemoteImage demotes image to secondary. -func (ri *rbdImage) DemoteImage() error { +// Demote demotes image to secondary. +func (ri *rbdImage) Demote() error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -196,8 +189,8 @@ func (ri *rbdImage) DemoteImage() error { return nil } -// resyncImage resync image to correct the split-brain. -func (ri *rbdImage) resyncImage() error { +// Resync resync image to correct the split-brain. +func (ri *rbdImage) Resync() error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -208,11 +201,14 @@ func (ri *rbdImage) resyncImage() error { return fmt.Errorf("failed to resync image %q with error: %w", ri, err) } - return nil + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) } -// GetImageMirroringStatus get the mirroring status of an image. -func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) { +// GetGlobalMirroringStatus get the mirroring status of an image. +func (ri *rbdImage) GetGlobalMirroringStatus() (types.GlobalStatus, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -223,26 +219,101 @@ func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, return nil, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err) } - return &statusInfo, nil + return GlobalMirrorStatus{GlobalMirrorImageStatus: statusInfo}, nil } -// GetLocalState returns the local state of the image. -func (ri *rbdImage) GetLocalState() (librbd.SiteMirrorImageStatus, error) { - localStatus := librbd.SiteMirrorImageStatus{} - image, err := ri.open() - if err != nil { - return localStatus, fmt.Errorf("failed to open image %q with error: %w", ri, err) - } - defer image.Close() +type ImageStatus struct { + *librbd.MirrorImageInfo +} - statusInfo, err := image.GetGlobalMirrorStatus() - if err != nil { - return localStatus, fmt.Errorf("failed to get image mirroring status %q with error: %w", ri, err) +func (status ImageStatus) GetState() string { + return status.State.String() +} + +func (status ImageStatus) IsPrimary() bool { + return status.Primary +} + +type GlobalMirrorStatus struct { + librbd.GlobalMirrorImageStatus +} + +func (status GlobalMirrorStatus) GetState() string { + return status.GlobalMirrorImageStatus.Info.State.String() +} + +func (status GlobalMirrorStatus) IsPrimary() bool { + return status.GlobalMirrorImageStatus.Info.Primary +} + +func (status GlobalMirrorStatus) GetLocalSiteStatus() (types.SiteStatus, error) { + s, err := status.GlobalMirrorImageStatus.LocalStatus() + + return SiteMirrorImageStatus{ + SiteMirrorImageStatus: s, + }, fmt.Errorf("failed to get local site status: %w", err) +} + +func (status GlobalMirrorStatus) GetAllSitesStatus() []types.SiteStatus { + var siteStatuses []types.SiteStatus + for _, ss := range status.SiteStatuses { + siteStatuses = append(siteStatuses, SiteMirrorImageStatus{SiteMirrorImageStatus: ss}) } - localStatus, err = statusInfo.LocalStatus() - if err != nil { - return localStatus, fmt.Errorf("failed to get local status: %w", err) + + return siteStatuses +} + +// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses +// slice that corresponds to the remote site's status. If the remote status +// is not found than the error ErrNotExist will be returned. +func (status GlobalMirrorStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) { + var ( + ss librbd.SiteMirrorImageStatus + err error = librbd.ErrNotExist + ) + + for i := range status.SiteStatuses { + log.DebugLog( + ctx, + "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + status.SiteStatuses[i].MirrorUUID, + status.SiteStatuses[i].State, + status.SiteStatuses[i].Description, + status.SiteStatuses[i].LastUpdate, + status.SiteStatuses[i].Up) + + if status.SiteStatuses[i].MirrorUUID != "" { + ss = status.SiteStatuses[i] + err = nil + + break + } } - return localStatus, nil + return SiteMirrorImageStatus{SiteMirrorImageStatus: ss}, err +} + +type SiteMirrorImageStatus struct { + librbd.SiteMirrorImageStatus +} + +func (status SiteMirrorImageStatus) GetMirrorUUID() string { + return status.MirrorUUID +} + +func (status SiteMirrorImageStatus) GetState() string { + return status.State.String() +} + +func (status SiteMirrorImageStatus) GetDescription() string { + return status.Description +} + +func (status SiteMirrorImageStatus) IsUP() bool { + return status.Up +} + +func (status SiteMirrorImageStatus) GetLastUpdate() time.Time { + // convert the last update time to UTC + return time.Unix(status.LastUpdate, 0).UTC() } diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 884d66a2b2d4..28437a921346 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -413,6 +413,10 @@ func (ri *rbdImage) String() string { return fmt.Sprintf("%s/%s", ri.Pool, ri.RbdImageName) } +func (ri *rbdImage) GetPoolName() string { + return ri.Pool +} + // String returns the snap-spec (pool/{namespace/}image@snap) format of the snapshot. func (rs *rbdSnapshot) String() string { if rs.RadosNamespace != "" { @@ -1594,9 +1598,9 @@ func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageO return nil } -// GetImageCreationTime returns the creation time of the image. if the image +// GetCreationTime returns the creation time of the image. if the image // creation time is not set, it queries the image info and returns the creation time. -func (ri *rbdImage) GetImageCreationTime() (*timestamppb.Timestamp, error) { +func (ri *rbdImage) GetCreationTime() (*timestamppb.Timestamp, error) { if ri.CreatedAt != nil { return ri.CreatedAt, nil } diff --git a/internal/rbd/replication.go b/internal/rbd/replication.go index c6b4c55ddcc9..86c31cd85fe8 100644 --- a/internal/rbd/replication.go +++ b/internal/rbd/replication.go @@ -20,20 +20,11 @@ import ( "context" "fmt" + "github.com/ceph/ceph-csi/internal/rbd/types" + librbd "github.com/ceph/go-ceph/rbd" ) -func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus) error { - if err := rv.resyncImage(); err != nil { - return fmt.Errorf("failed to resync image: %w", err) - } - - // If we issued a resync, return a non-final error as image needs to be recreated - // locally. Caller retries till RBD syncs an initial version of the image to - // report its status in the resync request. - return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) -} - // repairResyncedImageID updates the existing image ID with new one. func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) error { // During resync operation the local image will get deleted and a new @@ -54,11 +45,11 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro return rv.repairImageID(ctx, j, true) } -func (rv *rbdVolume) DisableVolumeReplication( - mirroringInfo *librbd.MirrorImageInfo, +func DisableVolumeReplication(mirror types.Mirror, + primary, force bool, ) error { - if !mirroringInfo.Primary { + if !primary { // Return success if the below condition is met // Local image is secondary // Local image is in up+replaying state @@ -71,29 +62,35 @@ func (rv *rbdVolume) DisableVolumeReplication( // disabled the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the volume // replication Kubernetes artifacts after failback operation. - localStatus, rErr := rv.GetLocalState() + sts, rErr := mirror.GetGlobalMirroringStatus() if rErr != nil { - return fmt.Errorf("failed to get local state: %w", rErr) + return fmt.Errorf("failed to get global state: %w", rErr) + } + + localStatus, err := sts.GetLocalSiteStatus() + if err != nil { + return fmt.Errorf("failed to get local state: %w", ErrInvalidArgument) } - if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying { + if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() { return nil } return fmt.Errorf("%w: secondary image status is up=%t and state=%s", - ErrInvalidArgument, localStatus.Up, localStatus.State) + ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState()) } - err := rv.DisableImageMirroring(force) + err := mirror.DisableMirroring(force) if err != nil { return fmt.Errorf("failed to disable image mirroring: %w", err) } // the image state can be still disabling once we disable the mirroring // check the mirroring is disabled or not - mirroringInfo, err = rv.GetImageMirroringInfo() + info, err := mirror.GetMirroringInfo() if err != nil { return fmt.Errorf("failed to get mirroring info of image: %w", err) } - if mirroringInfo.State == librbd.MirrorImageDisabling { - return fmt.Errorf("%w: %q is in disabling state", ErrAborted, rv.VolID) + + if info.GetState() == librbd.MirrorImageDisabling.String() { + return fmt.Errorf("%w: image is in disabling state", ErrAborted) } return nil diff --git a/internal/rbd/types/group.go b/internal/rbd/types/group.go index 3beb4e9530cb..ce64a605307c 100644 --- a/internal/rbd/types/group.go +++ b/internal/rbd/types/group.go @@ -45,4 +45,6 @@ type VolumeGroup interface { // ListVolumes returns a slice with all Volumes in the VolumeGroup. ListVolumes(ctx context.Context) ([]Volume, error) + + Mirror } diff --git a/internal/rbd/types/mirror.go b/internal/rbd/types/mirror.go new file mode 100644 index 000000000000..2bbfb74fcc4b --- /dev/null +++ b/internal/rbd/types/mirror.go @@ -0,0 +1,77 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "context" + "time" + + "github.com/ceph/ceph-csi/internal/util" + + librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" +) + +// FlattenMode is used to indicate the flatten mode for an RBD image. +type FlattenMode string + +const ( + // FlattenModeNever indicates that the image should never be flattened. + FlattenModeNever FlattenMode = "never" + // FlattenModeForce indicates that the image with the parent must be flattened. + FlattenModeForce FlattenMode = "force" +) + +type Mirror interface { + // EnableMirroring enables mirroring on the resource with the specified mode. + EnableMirroring(mode librbd.ImageMirrorMode) error + // DisableMirroring disables mirroring on the resource with the option to force the operation + DisableMirroring(force bool) error + // Promote promotes the resource to primary status with the option to force the operation + Promote(force bool) error + // ForcePromote promotes the resource to primary status with a timeout + ForcePromote(cr *util.Credentials) error + // Demote demotes the resource to secondary status + Demote() error + // Resync resynchronizes the resource + Resync() error + // GetMirroringInfo returns the mirroring information of the resource + GetMirroringInfo() (MirrorInfo, error) + // GetMirroringInfo returns the mirroring information of the resource + GetGlobalMirroringStatus() (GlobalStatus, error) + // AddSnapshotScheduling adds a snapshot scheduling to the resource + AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error +} + +type MirrorInfo interface { + IsPrimary() bool + GetState() string +} + +type GlobalStatus interface { + MirrorInfo + GetLocalSiteStatus() (SiteStatus, error) + GetAllSitesStatus() []SiteStatus + GetRemoteSiteStatus(ctx context.Context) (SiteStatus, error) +} +type SiteStatus interface { + GetMirrorUUID() string + IsUP() bool + GetState() string + GetDescription() string + GetLastUpdate() time.Time +} diff --git a/internal/rbd/types/volume.go b/internal/rbd/types/volume.go index 99961a733828..7f6114444b6a 100644 --- a/internal/rbd/types/volume.go +++ b/internal/rbd/types/volume.go @@ -20,8 +20,10 @@ import ( "context" "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/protobuf/types/known/timestamppb" ) +//nolint:interfacebloat // more than 10 methods are needed for the interface type Volume interface { // Destroy frees the resources used by the Volume. Destroy(ctx context.Context) @@ -34,4 +36,23 @@ type Volume interface { // ToCSI creates a CSI protocol formatted struct of the volume. ToCSI(ctx context.Context) *csi.Volume + + // GetPoolName returns the name of the pool where the volume is stored. + GetPoolName() string + // GetCreationTime returns the creation time of the volume. + GetCreationTime() (*timestamppb.Timestamp, error) + // GetMetadata returns the value of the metadata key from the volume. + GetMetadata(key string) (string, error) + // SetMetadata sets the value of the metadata key on the volume. + SetMetadata(key, value string) error + // RepairResyncedImageID updates the existing image ID with new one in OMAP. + RepairResyncedImageID(ctx context.Context, ready bool) error + // HandleParentImageExistence checks the image's parent. + // if the parent image does not exist and is not in trash, it returns nil. + // if the flattenMode is FlattenModeForce, it flattens the image itself. + // if the parent image is in trash, it returns an error. + // if the parent image exists and is not enabled for mirroring, it returns an error. + HandleParentImageExistence(ctx context.Context, flattenMode FlattenMode) error + + Mirror }