Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support block volume #132

Merged
merged 6 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deploy/rbd/kubernetes/csi-provisioner-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["list", "watch", "create", "update", "patch"]
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "create", "update"]

---
kind: ClusterRoleBinding
Expand Down
3 changes: 1 addition & 2 deletions deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ spec:
serviceAccount: csi-provisioner
containers:
- name: csi-provisioner
image: quay.io/k8scsi/csi-provisioner:v1.0.0
image: quay.io/k8scsi/csi-provisioner:canary
args:
- "--provisioner=csi-rbdplugin"
- "--csi-address=$(ADDRESS)"
- "--v=5"
env:
Expand Down
7 changes: 7 additions & 0 deletions deploy/rbd/kubernetes/csi-rbdplugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ spec:
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
- name: plugin-mount-dir
mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/
mountPropagation: "Bidirectional"
- mountPath: /dev
name: host-dev
- mountPath: /rootfs
Expand All @@ -86,6 +89,10 @@ spec:
hostPath:
path: /var/lib/kubelet/plugins_registry/csi-rbdplugin
type: DirectoryOrCreate
- name: plugin-mount-dir
hostPath:
path: /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/
type: DirectoryOrCreate
- name: registration-dir
hostPath:
path: /var/lib/kubelet/plugins_registry/
Expand Down
5 changes: 0 additions & 5 deletions pkg/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if req.VolumeCapabilities == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}
for _, cap := range req.VolumeCapabilities {
if cap.GetBlock() != nil {
return nil, status.Error(codes.Unimplemented, "Block Volume not supported")
}
}

volumeNameMutex.LockKey(req.GetName())
defer volumeNameMutex.UnlockKey(req.GetName())
Expand Down
137 changes: 115 additions & 22 deletions pkg/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package rbd
import (
"fmt"
"os"
"os/exec"
"regexp"
"strings"

"github.com/golang/glog"
Expand All @@ -40,21 +42,45 @@ type nodeServer struct {

func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
targetPath := req.GetTargetPath()

if !strings.HasSuffix(targetPath, "/mount") {
return nil, fmt.Errorf("rnd: malformed the value of target path: %s", targetPath)
}
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
volName := s[len(s)-1]

targetPathMutex.LockKey(targetPath)
defer targetPathMutex.UnlockKey(targetPath)

notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
var volName string
isBlock := req.GetVolumeCapability().GetBlock() != nil

if isBlock {
// Get volName from targetPath
s := strings.Split(targetPath, "/")
volName = s[len(s)-1]
} else {
// Get volName from targetPath
if !strings.HasSuffix(targetPath, "/mount") {
return nil, fmt.Errorf("rbd: malformed the value of target path: %s", targetPath)
}
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
volName = s[len(s)-1]
}

// Check if that target path exists properly
notMnt, err := ns.mounter.IsNotMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
if err = os.MkdirAll(targetPath, 0750); err != nil {
return nil, status.Error(codes.Internal, err.Error())
if isBlock {
// create an empty file
targetPathFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, 0750)
if err != nil {
glog.V(4).Infof("Failed to create targetPath:%s with error: %v", targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
if err := targetPathFile.Close(); err != nil {
glog.V(4).Infof("Failed to close targetPath:%s with error: %v", targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
} else {
// Create a directory
if err = os.MkdirAll(targetPath, 0750); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
notMnt = true
} else {
Expand All @@ -76,23 +102,31 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, err
}
glog.V(4).Infof("rbd image: %s/%s was successfully mapped at %s\n", req.GetVolumeId(), volOptions.Pool, devicePath)
fsType := req.GetVolumeCapability().GetMount().GetFsType()

// Publish Path
fsType := req.GetVolumeCapability().GetMount().GetFsType()
readOnly := req.GetReadonly()
attrib := req.GetVolumeContext()
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()

glog.V(4).Infof("target %v\nfstype %v\ndevice %v\nreadonly %v\nattributes %v\n mountflags %v\n",
targetPath, fsType, devicePath, readOnly, attrib, mountFlags)

options := []string{}
if readOnly {
options = append(options, "ro")
}
glog.V(4).Infof("target %v\nisBlock %v\nfstype %v\ndevice %v\nreadonly %v\nattributes %v\n mountflags %v\n",
targetPath, isBlock, fsType, devicePath, readOnly, attrib, mountFlags)

diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: mount.NewOsExec()}
if err := diskMounter.FormatAndMount(devicePath, targetPath, fsType, options); err != nil {
return nil, err
if isBlock {
options := []string{"bind"}
if err := diskMounter.Mount(devicePath, targetPath, fsType, options); err != nil {
return nil, err
}
} else {
options := []string{}
if readOnly {
options = append(options, "ro")
}

if err := diskMounter.FormatAndMount(devicePath, targetPath, fsType, options); err != nil {
return nil, err
}
}

return &csi.NodePublishVolumeResponse{}, nil
Expand All @@ -103,11 +137,18 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
targetPathMutex.LockKey(targetPath)
defer targetPathMutex.UnlockKey(targetPath)

notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
notMnt, err := ns.mounter.IsNotMountPoint(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
if os.IsNotExist(err) {
// targetPath has already been deleted
glog.V(4).Infof("targetPath: %s has already been deleted", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
return nil, status.Error(codes.NotFound, err.Error())
}
if notMnt {
// TODO should consider deleting path instead of returning error,
// once all codes become ready for csi 1.0.
return nil, status.Error(codes.NotFound, "Volume not mounted")
}

Expand All @@ -116,14 +157,33 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.Internal, err.Error())
}

// Bind mounted device needs to be resolved by using resolveBindMountedBlockDevice
if devicePath == "devtmpfs" {
var err error
devicePath, err = resolveBindMountedBlockDevice(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("NodeUnpublishVolume: devicePath: %s, (original)cnt: %d\n", devicePath, cnt)
// cnt for GetDeviceNameFromMount is broken for bind mouted device,
// it counts total number of mounted "devtmpfs", instead of counting this device.
// So, forcibly setting cnt to 1 here.
// TODO : fix this properly
cnt = 1
}

glog.V(4).Infof("NodeUnpublishVolume: targetPath: %s, devicePath: %s\n", targetPath, devicePath)

// Unmounting the image
err = ns.mounter.Unmount(targetPath)
if err != nil {
glog.V(3).Infof("failed to unmount targetPath: %s with error: %v", targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}

cnt--
if cnt != 0 {
// TODO should this be fixed not to success, so that driver can retry unmounting?
return &csi.NodeUnpublishVolumeResponse{}, nil
}

Expand All @@ -133,6 +193,12 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, err
}

// Remove targetPath
if err := os.RemoveAll(targetPath); err != nil {
glog.V(3).Infof("failed to remove targetPath: %s with error: %v", targetPath, err)
return nil, err
}

return &csi.NodeUnpublishVolumeResponse{}, nil
}

Expand All @@ -151,3 +217,30 @@ func (ns *nodeServer) NodeUnstageVolume(

return nil, status.Error(codes.Unimplemented, "")
}

func resolveBindMountedBlockDevice(mountPath string) (string, error) {
cmd := exec.Command("findmnt", "-n", "-o", "SOURCE", "--first-only", "--target", mountPath)
out, err := cmd.CombinedOutput()
if err != nil {
glog.V(2).Infof("Failed findmnt command for path %s: %s %v", mountPath, out, err)
return "", err
}
return parseFindMntResolveSource(string(out))
}

// parse output of "findmnt -o SOURCE --first-only --target" and return just the SOURCE
func parseFindMntResolveSource(out string) (string, error) {
// cut trailing newline
out = strings.TrimSuffix(out, "\n")
// Check if out is a mounted device
reMnt := regexp.MustCompile("^(/[^/]+(?:/[^/]*)*)$")
if match := reMnt.FindStringSubmatch(out); match != nil {
return match[1], nil
}
// Check if out is a block device
reBlk := regexp.MustCompile("^devtmpfs\\[(/[^/]+(?:/[^/]*)*)\\]$")
if match := reBlk.FindStringSubmatch(out); match != nil {
return fmt.Sprintf("/dev%s", match[1]), nil
}
return "", fmt.Errorf("parseFindMntResolveSource: %s doesn't match to any expected findMnt output", out)
}