-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dump and restore internal state #277
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,6 @@ import ( | |
"fmt" | ||
"math" | ||
"os" | ||
"path/filepath" | ||
"sort" | ||
"strconv" | ||
|
||
|
@@ -35,17 +34,12 @@ import ( | |
|
||
"github.com/container-storage-interface/spec/lib/go/csi" | ||
utilexec "k8s.io/utils/exec" | ||
) | ||
|
||
const ( | ||
deviceID = "deviceID" | ||
"github.com/kubernetes-csi/csi-driver-host-path/pkg/state" | ||
) | ||
|
||
type accessType int | ||
|
||
const ( | ||
mountAccess accessType = iota | ||
blockAccess | ||
deviceID = "deviceID" | ||
) | ||
|
||
func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, finalErr error) { | ||
|
@@ -84,13 +78,13 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque | |
return nil, status.Error(codes.InvalidArgument, "cannot have both block and mount access type") | ||
} | ||
|
||
var requestedAccessType accessType | ||
var requestedAccessType state.AccessType | ||
|
||
if accessTypeBlock { | ||
requestedAccessType = blockAccess | ||
requestedAccessType = state.BlockAccess | ||
} else { | ||
// Default to mount. | ||
requestedAccessType = mountAccess | ||
requestedAccessType = state.MountAccess | ||
} | ||
|
||
// Lock before acting on global state. A production-quality | ||
|
@@ -106,7 +100,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque | |
|
||
// Need to check for already existing volume name, and if found | ||
// check for the requested capacity and already allocated capacity | ||
if exVol, err := hp.getVolumeByName(req.GetName()); err == nil { | ||
if exVol, err := hp.state.GetVolumeByName(req.GetName()); err == nil { | ||
// Since err is nil, it means the volume with the same name already exists | ||
// need to check if the size of existing volume is the same as in new | ||
// request | ||
|
@@ -149,7 +143,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque | |
glog.V(4).Infof("created volume %s at path %s", vol.VolID, vol.VolPath) | ||
|
||
if req.GetVolumeContentSource() != nil { | ||
path := getVolumePath(volumeID) | ||
path := hp.getVolumePath(volumeID) | ||
volumeSource := req.VolumeContentSource | ||
switch volumeSource.Type.(type) { | ||
case *csi.VolumeContentSource_Snapshot: | ||
|
@@ -203,7 +197,7 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque | |
defer hp.mutex.Unlock() | ||
|
||
volId := req.GetVolumeId() | ||
vol, err := hp.getVolumeByID(volId) | ||
vol, err := hp.state.GetVolumeByID(volId) | ||
if err != nil { | ||
// Volume not found: might have already deleted | ||
return &csi.DeleteVolumeResponse{}, nil | ||
|
@@ -243,7 +237,7 @@ func (hp *hostPath) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val | |
hp.mutex.Lock() | ||
defer hp.mutex.Unlock() | ||
|
||
if _, err := hp.getVolumeByID(req.GetVolumeId()); err != nil { | ||
if _, err := hp.state.GetVolumeByID(req.GetVolumeId()); err != nil { | ||
return nil, err | ||
} | ||
|
||
|
@@ -287,7 +281,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro | |
hp.mutex.Lock() | ||
defer hp.mutex.Unlock() | ||
|
||
vol, err := hp.getVolumeByID(req.VolumeId) | ||
vol, err := hp.state.GetVolumeByID(req.VolumeId) | ||
if err != nil { | ||
return nil, status.Error(codes.NotFound, err.Error()) | ||
} | ||
|
@@ -311,8 +305,8 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro | |
|
||
vol.IsAttached = true | ||
vol.ReadOnlyAttach = req.GetReadonly() | ||
if err := hp.updateVolume(vol.VolID, vol); err != nil { | ||
return nil, status.Errorf(codes.Internal, "failed to update volume %s: %v", vol.VolID, err) | ||
if err := hp.state.UpdateVolume(vol); err != nil { | ||
return nil, err | ||
} | ||
|
||
return &csi.ControllerPublishVolumeResponse{ | ||
|
@@ -337,7 +331,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont | |
hp.mutex.Lock() | ||
defer hp.mutex.Unlock() | ||
|
||
vol, err := hp.getVolumeByID(req.VolumeId) | ||
vol, err := hp.state.GetVolumeByID(req.VolumeId) | ||
if err != nil { | ||
// Not an error: a non-existent volume is not published. | ||
// See also https://github.com/kubernetes-csi/external-attacher/pull/165 | ||
|
@@ -351,7 +345,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont | |
} | ||
|
||
vol.IsAttached = false | ||
if err := hp.updateVolume(vol.VolID, vol); err != nil { | ||
if err := hp.state.UpdateVolume(vol); err != nil { | ||
return nil, status.Errorf(codes.Internal, "could not update volume %s: %v", vol.VolID, err) | ||
} | ||
|
||
|
@@ -399,15 +393,20 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest | |
|
||
var ( | ||
startIdx, volumesLength, maxLength int64 | ||
hpVolume hostPathVolume | ||
hpVolume state.Volume | ||
) | ||
|
||
// Lock before acting on global state. A production-quality | ||
// driver might use more fine-grained locking. | ||
hp.mutex.Lock() | ||
defer hp.mutex.Unlock() | ||
|
||
volumeIds := hp.getSortedVolumeIDs() | ||
// Sort by volume ID. | ||
volumes := hp.state.GetVolumes() | ||
sort.Slice(volumes, func(i, j int) bool { | ||
return volumes[i].VolID < volumes[j].VolID | ||
}) | ||
|
||
if req.StartingToken == "" { | ||
req.StartingToken = "1" | ||
} | ||
|
@@ -417,16 +416,16 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest | |
return nil, status.Error(codes.Aborted, "The type of startingToken should be integer") | ||
} | ||
|
||
volumesLength = int64(len(volumeIds)) | ||
volumesLength = int64(len(volumes)) | ||
maxLength = int64(req.MaxEntries) | ||
|
||
if maxLength > volumesLength || maxLength <= 0 { | ||
maxLength = volumesLength | ||
} | ||
|
||
for index := startIdx - 1; index < volumesLength && index < maxLength; index++ { | ||
hpVolume = hp.volumes[volumeIds[index]] | ||
healthy, msg := hp.doHealthCheckInControllerSide(volumeIds[index]) | ||
hpVolume = volumes[index] | ||
healthy, msg := hp.doHealthCheckInControllerSide(hpVolume.VolID) | ||
glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy) | ||
volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{ | ||
Volume: &csi.Volume{ | ||
|
@@ -453,7 +452,7 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller | |
hp.mutex.Lock() | ||
defer hp.mutex.Unlock() | ||
|
||
volume, err := hp.getVolumeByID(req.GetVolumeId()) | ||
volume, err := hp.state.GetVolumeByID(req.GetVolumeId()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -475,11 +474,6 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller | |
}, nil | ||
} | ||
|
||
// getSnapshotPath returns the full path to where the snapshot is stored | ||
func getSnapshotPath(snapshotID string) string { | ||
return filepath.Join(dataRoot, fmt.Sprintf("%s%s", snapshotID, snapshotExt)) | ||
} | ||
|
||
// CreateSnapshot uses tar command to create snapshot for hostpath volume. The tar command can quickly create | ||
// archives of entire directories. The host image must have "tar" binaries in /bin, /usr/sbin, or /usr/bin. | ||
func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { | ||
|
@@ -503,7 +497,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR | |
|
||
// Need to check for already existing snapshot name, and if found check for the | ||
// requested sourceVolumeId and sourceVolumeId of snapshot that has been created. | ||
if exSnap, err := hp.getSnapshotByName(req.GetName()); err == nil { | ||
if exSnap, err := hp.state.GetSnapshotByName(req.GetName()); err == nil { | ||
// Since err is nil, it means the snapshot with the same name already exists need | ||
// to check if the sourceVolumeId of existing snapshot is the same as in new request. | ||
if exSnap.VolID == req.GetSourceVolumeId() { | ||
|
@@ -522,18 +516,18 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR | |
} | ||
|
||
volumeID := req.GetSourceVolumeId() | ||
hostPathVolume, err := hp.getVolumeByID(volumeID) | ||
hostPathVolume, err := hp.state.GetVolumeByID(volumeID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
snapshotID := uuid.NewUUID().String() | ||
creationTime := ptypes.TimestampNow() | ||
volPath := hostPathVolume.VolPath | ||
file := getSnapshotPath(snapshotID) | ||
file := hp.getSnapshotPath(snapshotID) | ||
|
||
var cmd []string | ||
if hostPathVolume.VolAccessType == blockAccess { | ||
if hostPathVolume.VolAccessType == state.BlockAccess { | ||
glog.V(4).Infof("Creating snapshot of Raw Block Mode Volume") | ||
cmd = []string{"cp", volPath, file} | ||
} else { | ||
|
@@ -547,7 +541,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR | |
} | ||
|
||
glog.V(4).Infof("create volume snapshot %s", file) | ||
snapshot := hostPathSnapshot{} | ||
snapshot := state.Snapshot{} | ||
snapshot.Name = req.GetName() | ||
snapshot.Id = snapshotID | ||
snapshot.VolID = volumeID | ||
|
@@ -556,8 +550,9 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR | |
snapshot.SizeBytes = hostPathVolume.VolSize | ||
snapshot.ReadyToUse = true | ||
|
||
hp.snapshots[snapshotID] = snapshot | ||
|
||
if err := hp.state.UpdateSnapshot(snapshot); err != nil { | ||
return nil, err | ||
} | ||
return &csi.CreateSnapshotResponse{ | ||
Snapshot: &csi.Snapshot{ | ||
SnapshotId: snapshot.Id, | ||
|
@@ -587,9 +582,11 @@ func (hp *hostPath) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotR | |
defer hp.mutex.Unlock() | ||
|
||
glog.V(4).Infof("deleting snapshot %s", snapshotID) | ||
path := getSnapshotPath(snapshotID) | ||
path := hp.getSnapshotPath(snapshotID) | ||
os.RemoveAll(path) | ||
delete(hp.snapshots, snapshotID) | ||
if err := hp.state.DeleteSnapshot(snapshotID); err != nil { | ||
return nil, err | ||
} | ||
return &csi.DeleteSnapshotResponse{}, nil | ||
} | ||
|
||
|
@@ -604,33 +601,35 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq | |
hp.mutex.Lock() | ||
defer hp.mutex.Unlock() | ||
|
||
// case 1: SnapshotId is not empty, return snapshots that match the snapshot id. | ||
// case 1: SnapshotId is not empty, return snapshots that match the snapshot id, | ||
// none if not found. | ||
if len(req.GetSnapshotId()) != 0 { | ||
snapshotID := req.SnapshotId | ||
if snapshot, ok := hp.snapshots[snapshotID]; ok { | ||
if snapshot, err := hp.state.GetSnapshotByID(snapshotID); err == nil { | ||
return convertSnapshot(snapshot), nil | ||
} | ||
return &csi.ListSnapshotsResponse{}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This I created an issue for it: |
||
} | ||
|
||
// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id. | ||
// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id, | ||
// none if not found. | ||
if len(req.GetSourceVolumeId()) != 0 { | ||
for _, snapshot := range hp.snapshots { | ||
for _, snapshot := range hp.state.GetSnapshots() { | ||
if snapshot.VolID == req.SourceVolumeId { | ||
return convertSnapshot(snapshot), nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This returns as soon as one snapshot's volume id matches the request, but it is possible that there are multiple snapshots with the same source volume id. I see that this is in the original code so this can be fixed in a different PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, let's fix this separately. Please file a bug so that we don't forget. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And also we should have a csi-sanity test for it... |
||
} | ||
} | ||
return &csi.ListSnapshotsResponse{}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||
} | ||
|
||
var snapshots []csi.Snapshot | ||
// case 3: no parameter is set, so we return all the snapshots. | ||
sortedKeys := make([]string, 0) | ||
for k := range hp.snapshots { | ||
sortedKeys = append(sortedKeys, k) | ||
} | ||
sort.Strings(sortedKeys) | ||
hpSnapshots := hp.state.GetSnapshots() | ||
sort.Slice(hpSnapshots, func(i, j int) bool { | ||
return hpSnapshots[i].Id < hpSnapshots[j].Id | ||
}) | ||
|
||
for _, key := range sortedKeys { | ||
snap := hp.snapshots[key] | ||
for _, snap := range hpSnapshots { | ||
snapshot := csi.Snapshot{ | ||
SnapshotId: snap.Id, | ||
SourceVolumeId: snap.VolID, | ||
|
@@ -725,15 +724,15 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control | |
hp.mutex.Lock() | ||
defer hp.mutex.Unlock() | ||
|
||
exVol, err := hp.getVolumeByID(volID) | ||
exVol, err := hp.state.GetVolumeByID(volID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if exVol.VolSize < capacity { | ||
exVol.VolSize = capacity | ||
if err := hp.updateVolume(volID, exVol); err != nil { | ||
return nil, fmt.Errorf("could not update volume %s: %w", volID, err) | ||
if err := hp.state.UpdateVolume(exVol); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
|
@@ -743,7 +742,7 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control | |
}, nil | ||
} | ||
|
||
func convertSnapshot(snap hostPathSnapshot) *csi.ListSnapshotsResponse { | ||
func convertSnapshot(snap state.Snapshot) *csi.ListSnapshotsResponse { | ||
entries := []*csi.ListSnapshotsResponse_Entry{ | ||
{ | ||
Snapshot: &csi.Snapshot{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making this configurable is useful because then it is possible again to run the driver as non-root with a state directory in /tmp.
Node operations cannot be tested easily that way (need root for mounting), but even that could be achieved with wrapper scripts that rely on sudo.