Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dump and restore internal state #277

Merged
merged 2 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func main() {

flag.StringVar(&cfg.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
flag.StringVar(&cfg.DriverName, "drivername", "hostpath.csi.k8s.io", "name of the driver")
flag.StringVar(&cfg.StateDir, "statedir", "/csi-data-dir", "directory for storing state information across driver restarts, volumes and snapshots")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this configurable is useful because then it is possible again to run the driver as non-root with a state directory in /tmp.

Node operations cannot be tested easily that way (need root for mounting), but even that could be achieved with wrapper scripts that rely on sudo.

flag.StringVar(&cfg.NodeID, "nodeid", "", "node id")
flag.BoolVar(&cfg.Ephemeral, "ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
flag.Int64Var(&cfg.MaxVolumesPerNode, "maxvolumespernode", 0, "limit of volumes per node")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/golang/protobuf v1.4.3
github.com/kubernetes-csi/csi-lib-utils v0.9.0
github.com/pborman/uuid v1.2.1
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect
google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/thecodeteam/goscaleio v0.1.0/go.mod h1:68sdkZAsK8bvEwBlbQnlLS+xU+hvLYM/iQ8KXej1AwM=
Expand Down
109 changes: 54 additions & 55 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strconv"

Expand All @@ -35,17 +34,12 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
utilexec "k8s.io/utils/exec"
)

const (
deviceID = "deviceID"
"github.com/kubernetes-csi/csi-driver-host-path/pkg/state"
)

type accessType int

const (
mountAccess accessType = iota
blockAccess
deviceID = "deviceID"
)

func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, finalErr error) {
Expand Down Expand Up @@ -84,13 +78,13 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
return nil, status.Error(codes.InvalidArgument, "cannot have both block and mount access type")
}

var requestedAccessType accessType
var requestedAccessType state.AccessType

if accessTypeBlock {
requestedAccessType = blockAccess
requestedAccessType = state.BlockAccess
} else {
// Default to mount.
requestedAccessType = mountAccess
requestedAccessType = state.MountAccess
}

// Lock before acting on global state. A production-quality
Expand All @@ -106,7 +100,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque

// Need to check for already existing volume name, and if found
// check for the requested capacity and already allocated capacity
if exVol, err := hp.getVolumeByName(req.GetName()); err == nil {
if exVol, err := hp.state.GetVolumeByName(req.GetName()); err == nil {
// Since err is nil, it means the volume with the same name already exists
// need to check if the size of existing volume is the same as in new
// request
Expand Down Expand Up @@ -149,7 +143,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque
glog.V(4).Infof("created volume %s at path %s", vol.VolID, vol.VolPath)

if req.GetVolumeContentSource() != nil {
path := getVolumePath(volumeID)
path := hp.getVolumePath(volumeID)
volumeSource := req.VolumeContentSource
switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
Expand Down Expand Up @@ -203,7 +197,7 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque
defer hp.mutex.Unlock()

volId := req.GetVolumeId()
vol, err := hp.getVolumeByID(volId)
vol, err := hp.state.GetVolumeByID(volId)
if err != nil {
// Volume not found: might have already deleted
return &csi.DeleteVolumeResponse{}, nil
Expand Down Expand Up @@ -243,7 +237,7 @@ func (hp *hostPath) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val
hp.mutex.Lock()
defer hp.mutex.Unlock()

if _, err := hp.getVolumeByID(req.GetVolumeId()); err != nil {
if _, err := hp.state.GetVolumeByID(req.GetVolumeId()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -287,7 +281,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
hp.mutex.Lock()
defer hp.mutex.Unlock()

vol, err := hp.getVolumeByID(req.VolumeId)
vol, err := hp.state.GetVolumeByID(req.VolumeId)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
Expand All @@ -311,8 +305,8 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro

vol.IsAttached = true
vol.ReadOnlyAttach = req.GetReadonly()
if err := hp.updateVolume(vol.VolID, vol); err != nil {
return nil, status.Errorf(codes.Internal, "failed to update volume %s: %v", vol.VolID, err)
if err := hp.state.UpdateVolume(vol); err != nil {
return nil, err
}

return &csi.ControllerPublishVolumeResponse{
Expand All @@ -337,7 +331,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont
hp.mutex.Lock()
defer hp.mutex.Unlock()

vol, err := hp.getVolumeByID(req.VolumeId)
vol, err := hp.state.GetVolumeByID(req.VolumeId)
if err != nil {
// Not an error: a non-existent volume is not published.
// See also https://github.com/kubernetes-csi/external-attacher/pull/165
Expand All @@ -351,7 +345,7 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont
}

vol.IsAttached = false
if err := hp.updateVolume(vol.VolID, vol); err != nil {
if err := hp.state.UpdateVolume(vol); err != nil {
return nil, status.Errorf(codes.Internal, "could not update volume %s: %v", vol.VolID, err)
}

Expand Down Expand Up @@ -399,15 +393,20 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest

var (
startIdx, volumesLength, maxLength int64
hpVolume hostPathVolume
hpVolume state.Volume
)

// Lock before acting on global state. A production-quality
// driver might use more fine-grained locking.
hp.mutex.Lock()
defer hp.mutex.Unlock()

volumeIds := hp.getSortedVolumeIDs()
// Sort by volume ID.
volumes := hp.state.GetVolumes()
sort.Slice(volumes, func(i, j int) bool {
return volumes[i].VolID < volumes[j].VolID
})

if req.StartingToken == "" {
req.StartingToken = "1"
}
Expand All @@ -417,16 +416,16 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest
return nil, status.Error(codes.Aborted, "The type of startingToken should be integer")
}

volumesLength = int64(len(volumeIds))
volumesLength = int64(len(volumes))
maxLength = int64(req.MaxEntries)

if maxLength > volumesLength || maxLength <= 0 {
maxLength = volumesLength
}

for index := startIdx - 1; index < volumesLength && index < maxLength; index++ {
hpVolume = hp.volumes[volumeIds[index]]
healthy, msg := hp.doHealthCheckInControllerSide(volumeIds[index])
hpVolume = volumes[index]
healthy, msg := hp.doHealthCheckInControllerSide(hpVolume.VolID)
glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy)
volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
Expand All @@ -453,7 +452,7 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller
hp.mutex.Lock()
defer hp.mutex.Unlock()

volume, err := hp.getVolumeByID(req.GetVolumeId())
volume, err := hp.state.GetVolumeByID(req.GetVolumeId())
if err != nil {
return nil, err
}
Expand All @@ -475,11 +474,6 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller
}, nil
}

// getSnapshotPath returns the full path to where the snapshot is stored
func getSnapshotPath(snapshotID string) string {
return filepath.Join(dataRoot, fmt.Sprintf("%s%s", snapshotID, snapshotExt))
}

// CreateSnapshot uses tar command to create snapshot for hostpath volume. The tar command can quickly create
// archives of entire directories. The host image must have "tar" binaries in /bin, /usr/sbin, or /usr/bin.
func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
Expand All @@ -503,7 +497,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR

// Need to check for already existing snapshot name, and if found check for the
// requested sourceVolumeId and sourceVolumeId of snapshot that has been created.
if exSnap, err := hp.getSnapshotByName(req.GetName()); err == nil {
if exSnap, err := hp.state.GetSnapshotByName(req.GetName()); err == nil {
// Since err is nil, it means the snapshot with the same name already exists need
// to check if the sourceVolumeId of existing snapshot is the same as in new request.
if exSnap.VolID == req.GetSourceVolumeId() {
Expand All @@ -522,18 +516,18 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
}

volumeID := req.GetSourceVolumeId()
hostPathVolume, err := hp.getVolumeByID(volumeID)
hostPathVolume, err := hp.state.GetVolumeByID(volumeID)
if err != nil {
return nil, err
}

snapshotID := uuid.NewUUID().String()
creationTime := ptypes.TimestampNow()
volPath := hostPathVolume.VolPath
file := getSnapshotPath(snapshotID)
file := hp.getSnapshotPath(snapshotID)

var cmd []string
if hostPathVolume.VolAccessType == blockAccess {
if hostPathVolume.VolAccessType == state.BlockAccess {
glog.V(4).Infof("Creating snapshot of Raw Block Mode Volume")
cmd = []string{"cp", volPath, file}
} else {
Expand All @@ -547,7 +541,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
}

glog.V(4).Infof("create volume snapshot %s", file)
snapshot := hostPathSnapshot{}
snapshot := state.Snapshot{}
snapshot.Name = req.GetName()
snapshot.Id = snapshotID
snapshot.VolID = volumeID
Expand All @@ -556,8 +550,9 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR
snapshot.SizeBytes = hostPathVolume.VolSize
snapshot.ReadyToUse = true

hp.snapshots[snapshotID] = snapshot

if err := hp.state.UpdateSnapshot(snapshot); err != nil {
return nil, err
}
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SnapshotId: snapshot.Id,
Expand Down Expand Up @@ -587,9 +582,11 @@ func (hp *hostPath) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotR
defer hp.mutex.Unlock()

glog.V(4).Infof("deleting snapshot %s", snapshotID)
path := getSnapshotPath(snapshotID)
path := hp.getSnapshotPath(snapshotID)
os.RemoveAll(path)
delete(hp.snapshots, snapshotID)
if err := hp.state.DeleteSnapshot(snapshotID); err != nil {
return nil, err
}
return &csi.DeleteSnapshotResponse{}, nil
}

Expand All @@ -604,33 +601,35 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq
hp.mutex.Lock()
defer hp.mutex.Unlock()

// case 1: SnapshotId is not empty, return snapshots that match the snapshot id.
// case 1: SnapshotId is not empty, return snapshots that match the snapshot id,
// none if not found.
if len(req.GetSnapshotId()) != 0 {
snapshotID := req.SnapshotId
if snapshot, ok := hp.snapshots[snapshotID]; ok {
if snapshot, err := hp.state.GetSnapshotByID(snapshotID); err == nil {
return convertSnapshot(snapshot), nil
}
return &csi.ListSnapshotsResponse{}, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return here was missing earlier. It only passed because the test is not very precise:
https://github.com/kubernetes-csi/csi-test/blob/a251c44fd49d9eedd55a40b71e5da4ad080ba431/pkg/sanity/controller.go#L1157-L1165

I created an issue for it:
kubernetes-csi/csi-test#335

}

// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id.
// case 2: SourceVolumeId is not empty, return snapshots that match the source volume id,
// none if not found.
if len(req.GetSourceVolumeId()) != 0 {
for _, snapshot := range hp.snapshots {
for _, snapshot := range hp.state.GetSnapshots() {
if snapshot.VolID == req.SourceVolumeId {
return convertSnapshot(snapshot), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns as soon as one snapshot's volume id matches the request, but it is possible that there are multiple snapshots with the same source volume id. I see that this is in the original code so this can be fixed in a different PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's fix this separately. Please file a bug so that we don't forget.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also we should have a csi-sanity test for it...

}
}
return &csi.ListSnapshotsResponse{}, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}

var snapshots []csi.Snapshot
// case 3: no parameter is set, so we return all the snapshots.
sortedKeys := make([]string, 0)
for k := range hp.snapshots {
sortedKeys = append(sortedKeys, k)
}
sort.Strings(sortedKeys)
hpSnapshots := hp.state.GetSnapshots()
sort.Slice(hpSnapshots, func(i, j int) bool {
return hpSnapshots[i].Id < hpSnapshots[j].Id
})

for _, key := range sortedKeys {
snap := hp.snapshots[key]
for _, snap := range hpSnapshots {
snapshot := csi.Snapshot{
SnapshotId: snap.Id,
SourceVolumeId: snap.VolID,
Expand Down Expand Up @@ -725,15 +724,15 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control
hp.mutex.Lock()
defer hp.mutex.Unlock()

exVol, err := hp.getVolumeByID(volID)
exVol, err := hp.state.GetVolumeByID(volID)
if err != nil {
return nil, err
}

if exVol.VolSize < capacity {
exVol.VolSize = capacity
if err := hp.updateVolume(volID, exVol); err != nil {
return nil, fmt.Errorf("could not update volume %s: %w", volID, err)
if err := hp.state.UpdateVolume(exVol); err != nil {
return nil, err
}
}

Expand All @@ -743,7 +742,7 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control
}, nil
}

func convertSnapshot(snap hostPathSnapshot) *csi.ListSnapshotsResponse {
func convertSnapshot(snap state.Snapshot) *csi.ListSnapshotsResponse {
entries := []*csi.ListSnapshotsResponse_Entry{
{
Snapshot: &csi.Snapshot{
Expand Down
Loading