Skip to content

Commit

Permalink
cephfs: Add CephFS snapshot create/delete support
Browse files Browse the repository at this point in the history
Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
  • Loading branch information
humblec committed Jul 8, 2020
1 parent 82080dc commit 15a1cad
Show file tree
Hide file tree
Showing 7 changed files with 734 additions and 1 deletion.
220 changes: 220 additions & 0 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package cephfs
import (
"context"
"errors"
"time"

csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/util"
"github.com/golang/protobuf/ptypes"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
Expand All @@ -37,6 +40,10 @@ type ControllerServer struct {
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
VolumeLocks *util.VolumeLocks

// A map storing all volumes with ongoing operations so that additional operations
// for that same snapshot (as defined by SnapshotID/snapshot name) return an Aborted error
SnapshotLocks *util.VolumeLocks
}

type controllerCacheEntry struct {
Expand Down Expand Up @@ -377,3 +384,216 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
NodeExpansionRequired: false,
}, nil
}

// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
// nolint: gocyclo
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if err := cs.validateSnapshotReq(ctx, req); err != nil {
return nil, err
}
requestName := req.GetName()
// Existence and conflict checks
if acquired := cs.SnapshotLocks.TryAcquire(requestName); !acquired {
klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), requestName)
return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, requestName)
}
defer cs.SnapshotLocks.Release(requestName)

// TODO take lock on parent subvolume

sourceVolID := req.GetSourceVolumeId()
// Find the volume using the provided VolumeID
volOptions, vid, err := newVolumeOptionsFromVolID(ctx, sourceVolID, nil, req.GetSecrets())
if err != nil {
var epnf util.ErrPoolNotFound
if errors.As(err, &epnf) {
klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), sourceVolID, err)
return nil, status.Error(codes.NotFound, err.Error())
}
var evnf ErrVolumeNotFound
if errors.As(err, &evnf) {
return nil, status.Error(codes.NotFound, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
// TODO check snapshot exists

volOptions.RequestName = req.GetName()
// Reservation
snapInfo, err := checkSnapExists(ctx, volOptions, vid.FsSubvolName, req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if snapInfo != nil {
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: volOptions.Size,
SnapshotId: snapInfo.ID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: snapInfo.CreationTime,
ReadyToUse: true,
},
}, nil
}
sID, err := reserveSnap(ctx, volOptions, vid.FsSubvolName, req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil {
errDefer := undoSnapReservation(ctx, volOptions, *sID, req.GetSecrets())
if errDefer != nil {
klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s (%s)"),
requestName, errDefer)
}
}
}()

volOptions.SnapshotName = sID.FsSnapshotName
snap := snapshotInfo{}
snap, err = doSnapshot(ctx, *&vid.FsSubvolName, volOptions, req.GetSecrets())
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: volOptions.Size,
SnapshotId: sID.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: snap.CreationTime,
ReadyToUse: true,
},
}, nil
}

func doSnapshot(ctx context.Context, subvolumeName string, volOpt *volumeOptions, secret map[string]string) (snapshotInfo, error) {
volID := volumeID(subvolumeName)
snap := snapshotInfo{}
cr, err := util.NewAdminCredentials(secret)
if err != nil {
return snap, err
}
defer cr.DeleteCredentials()

err = createSnapshot(ctx, volOpt, cr, volID)
if err != nil {
return snap, err
}
defer func() {
if err != nil {
dErr := deleteSnapshot(ctx, volOpt, cr, volID)
if dErr != nil {
klog.Errorf(util.Log(ctx, "failed to delete snapshot %s %v"), volOpt.SnapshotName, err)
}
}
}()
snap, err = getSnapshotInfo(ctx, volOpt, cr, volID)
if err != nil {
return snap, err
}
tm := time.Time{}
tm, err = time.Parse(time.ANSIC, snap.CreatedAt)
if err != nil {
return snap, err
}
snap.CreationTime, err = ptypes.TimestampProto(tm)
if err != nil {
return snap, err
}
err = protectSnapshot(ctx, volOpt, cr, volID)
return snap, err
}

func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.CreateSnapshotRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
klog.Errorf(util.Log(ctx, "invalid create snapshot req: %v"), protosanitizer.StripSecrets(req))
return err
}

// Check sanity of request Snapshot Name, Source Volume Id
if req.Name == "" {
return status.Error(codes.InvalidArgument, "snapshot Name cannot be empty")
}
if req.SourceVolumeId == "" {
return status.Error(codes.InvalidArgument, "source Volume ID cannot be empty")
}

return nil
}

// DeleteSnapshot deletes the snapshot in backend and removes the
// snapshot metadata from store
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
klog.Errorf(util.Log(ctx, "invalid delete snapshot req: %v"), protosanitizer.StripSecrets(req))
return nil, err
}

cr, err := util.NewAdminCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()
snapshotID := req.GetSnapshotId()
if snapshotID == "" {
return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
}

if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), snapshotID)
return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, snapshotID)
}
defer cs.SnapshotLocks.Release(snapshotID)

volOpt, sid, err := newSnapshotOptionsFromID(ctx, snapshotID, cr)
if err != nil {
// if error is ErrPoolNotFound, the pool is already deleted we dont
// need to worry about deleting snapshot or omap data, return success
var epnf util.ErrPoolNotFound
if errors.As(err, &epnf) {
klog.Warningf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err)
return &csi.DeleteSnapshotResponse{}, nil
}

// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (snap and snapOMap are garbage collected already), hence return
// success as deletion is complete
var eknf util.ErrKeyNotFound
if errors.As(err, &eknf) {
return &csi.DeleteSnapshotResponse{}, nil
}
var snof util.ErrSnapNotFound
if errors.As(err, &snof) {
err = undoSnapReservation(ctx, volOpt, *sid, req.GetSecrets())
if err != nil {
klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)"),
volOpt.RequestName, volOpt.SnapshotName, err)
return nil, status.Error(codes.Internal, err.Error())
}
}
return nil, status.Error(codes.Internal, err.Error())
}

// safeguard against parallel create or delete requests against the same
// name
if acquired := cs.SnapshotLocks.TryAcquire(volOpt.RequestName); !acquired {
klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), volOpt.RequestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volOpt.RequestName)
}
defer cs.SnapshotLocks.Release(volOpt.RequestName)

err = unprotectSnapshot(ctx, volOpt, cr, volumeID(sid.FsSubVolumeName))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
err = deleteSnapshot(ctx, volOpt, cr, volumeID(sid.FsSubVolumeName))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
err = undoSnapReservation(ctx, volOpt, *sid, req.GetSecrets())
if err != nil {
klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)"),
volOpt.RequestName, volOpt.SnapshotName, err)
return nil, status.Error(codes.Internal, err.Error())
}

return &csi.DeleteSnapshotResponse{}, nil
}
4 changes: 3 additions & 1 deletion internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ var (

// volJournal is used to maintain RADOS based journals for CO generated
// VolumeName to backing CephFS subvolumes
volJournal *journal.Config
volJournal *journal.Config
snapJournal *journal.Config
)

// NewDriver returns new ceph driver
Expand Down Expand Up @@ -112,6 +113,7 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
// Create an instance of the volume journal
volJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, radosNamespace)

snapJournal = journal.NewCSISnapshotJournalWithNamespace(CSIInstanceID, radosNamespace)
// Initialize default library driver

fs.cd = csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID)
Expand Down
Loading

0 comments on commit 15a1cad

Please sign in to comment.