From b88f7c1123c6f379b23e9db665d8cf3bad1a7041 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 7 Jan 2021 15:27:37 -0800 Subject: [PATCH] add volume metrics --- go.mod | 1 + pkg/driver/mocks/mock_statter.go | 68 ++++++++++++++++++++++++++++ pkg/driver/mount.go | 6 +++ pkg/driver/node.go | 78 +++++++++++++++++++++++++++++++- pkg/driver/node_test.go | 64 ++++++++++++++++++-------- pkg/driver/sanity_test.go | 56 +++++++++++++++++++++-- pkg/driver/statter.go | 74 ++++++++++++++++++++++++++++++ pkg/driver/statter_test.go | 58 ++++++++++++++++++++++++ 8 files changed, 380 insertions(+), 25 deletions(-) create mode 100644 pkg/driver/mocks/mock_statter.go create mode 100644 pkg/driver/statter.go create mode 100644 pkg/driver/statter_test.go diff --git a/go.mod b/go.mod index 2acb11cc67..f6f7162684 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/onsi/ginkgo v1.10.2 github.com/onsi/gomega v1.7.0 github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect + golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f google.golang.org/grpc v1.26.0 k8s.io/api v0.17.3 k8s.io/apimachinery v0.17.3 diff --git a/pkg/driver/mocks/mock_statter.go b/pkg/driver/mocks/mock_statter.go new file mode 100644 index 0000000000..3af7e12809 --- /dev/null +++ b/pkg/driver/mocks/mock_statter.go @@ -0,0 +1,68 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./pkg/driver/statter.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockStatter is a mock of Statter interface +type MockStatter struct { + ctrl *gomock.Controller + recorder *MockStatterMockRecorder +} + +// MockStatterMockRecorder is the mock recorder for MockStatter +type MockStatterMockRecorder struct { + mock *MockStatter +} + +// NewMockStatter creates a new mock instance +func NewMockStatter(ctrl *gomock.Controller) *MockStatter { + mock := &MockStatter{ctrl: ctrl} + mock.recorder = &MockStatterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStatter) EXPECT() *MockStatterMockRecorder { + return m.recorder +} + +// StatFS mocks base method +func (m *MockStatter) StatFS(path string) (int64, int64, int64, int64, int64, int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StatFS", path) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(int64) + ret2, _ := ret[2].(int64) + ret3, _ := ret[3].(int64) + ret4, _ := ret[4].(int64) + ret5, _ := ret[5].(int64) + ret6, _ := ret[6].(error) + return ret0, ret1, ret2, ret3, ret4, ret5, ret6 +} + +// StatFS indicates an expected call of StatFS +func (mr *MockStatterMockRecorder) StatFS(path interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatFS", reflect.TypeOf((*MockStatter)(nil).StatFS), path) +} + +// IsBlockDevice mocks base method +func (m *MockStatter) IsBlockDevice(arg0 string) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsBlockDevice", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsBlockDevice indicates an expected call of IsBlockDevice +func (mr *MockStatterMockRecorder) IsBlockDevice(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsBlockDevice", reflect.TypeOf((*MockStatter)(nil).IsBlockDevice), arg0) +} diff --git a/pkg/driver/mount.go b/pkg/driver/mount.go index 7270e2b435..4c505963b5 100644 --- a/pkg/driver/mount.go +++ b/pkg/driver/mount.go @@ -53,6 +53,8 @@ func (m *NodeMounter) GetDeviceName(mountPath string) (string, int, error) { return mount.GetDeviceNameFromMount(m, mountPath) } +// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code +// Please mirror the change to func MakeFile in ./sanity_test.go func (m *NodeMounter) MakeFile(pathname string) error { f, err := os.OpenFile(pathname, os.O_CREATE, os.FileMode(0644)) if err != nil { @@ -66,6 +68,8 @@ func (m *NodeMounter) MakeFile(pathname string) error { return nil } +// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code +// Please mirror the change to func MakeFile in ./sanity_test.go func (m *NodeMounter) MakeDir(pathname string) error { err := os.MkdirAll(pathname, os.FileMode(0755)) if err != nil { @@ -76,6 +80,8 @@ func (m *NodeMounter) MakeDir(pathname string) error { return nil } +// This function is mirrored in ./sanity_test.go to make sure sanity test covered this block of code +// Please mirror the change to func MakeFile in ./sanity_test.go func (m *NodeMounter) ExistsPath(filename string) (bool, error) { if _, err := os.Stat(filename); os.IsNotExist(err) { return false, nil diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 2dc3dc28f2..47056f3edf 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "regexp" + "strconv" "strings" csi "github.com/container-storage-interface/spec/lib/go/csi" @@ -65,6 +66,7 @@ var ( nodeCaps = []csi.NodeServiceCapability_RPC_Type{ csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, csi.NodeServiceCapability_RPC_EXPAND_VOLUME, + csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, } ) @@ -72,6 +74,7 @@ var ( type nodeService struct { metadata cloud.MetadataService mounter Mounter + statter Statter inFlight *internal.InFlight driverOptions *DriverOptions } @@ -87,6 +90,7 @@ func newNodeService(driverOptions *DriverOptions) nodeService { return nodeService{ metadata: metadata, mounter: newNodeMounter(), + statter: NewStatter(), inFlight: internal.NewInFlight(), driverOptions: driverOptions, } @@ -344,7 +348,65 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu } func (d *nodeService) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats is not implemented yet") + klog.V(4).Infof("NodeGetVolumeStats: called with args %+v", *req) + if len(req.VolumeId) == 0 { + return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty") + } + if len(req.VolumePath) == 0 { + return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty") + } + + exists, err := d.mounter.ExistsPath(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "unknown error when stat on %s: %v", req.VolumePath, err) + } + if !exists { + return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath) + } + + isBlock, err := d.statter.IsBlockDevice(req.VolumePath) + + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to determine whether %s is block device: %v", req.VolumePath, err) + } + if isBlock { + bcap, err := d.getBlockSizeBytes(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get block capacity on path %s: %v", req.VolumePath, err) + } + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Unit: csi.VolumeUsage_BYTES, + Total: bcap, + }, + }, + }, nil + } + + available, capacity, used, inodesFree, inodes, inodesUsed, err := d.statter.StatFS(req.VolumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get fs info on path %s: %v", req.VolumePath, err) + } + + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Available: available, + Total: capacity, + Used: used, + Unit: csi.VolumeUsage_BYTES, + }, + + { + Available: inodesFree, + Total: inodes, + Used: inodesUsed, + Unit: csi.VolumeUsage_INODES, + }, + }, + }, nil + } func (d *nodeService) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { @@ -550,3 +612,17 @@ func hasMountOption(options []string, opt string) bool { } return false } + +func (d *nodeService) getBlockSizeBytes(devicePath string) (int64, error) { + cmd := d.mounter.Command("blockdev", "--getsize64", devicePath) + output, err := cmd.Output() + if err != nil { + return -1, fmt.Errorf("error when getting size of block volume at path %s: output: %s, err: %v", devicePath, string(output), err) + } + strOut := strings.TrimSpace(string(output)) + gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64) + if err != nil { + return -1, fmt.Errorf("failed to parse size %s as int", strOut) + } + return gotSizeBytes, nil +} diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index 7430d5fd1a..d0d59a0dfb 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -1170,32 +1170,49 @@ func TestNodeUnpublishVolume(t *testing.T) { } func TestNodeGetVolumeStats(t *testing.T) { - mockCtl := gomock.NewController(t) - defer mockCtl.Finish() + testCases := []struct { + name string + testFunc func(t *testing.T) + }{ + { + name: "success normal", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() - mockMetadata := mocks.NewMockMetadataService(mockCtl) - mockMounter := mocks.NewMockMounter(mockCtl) + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + mockStatter := mocks.NewMockStatter(mockCtl) + VolumePath := "/test" + one := int64(1) - awsDriver := nodeService{ - metadata: mockMetadata, - mounter: mockMounter, - inFlight: internal.NewInFlight(), - } + mockMounter.EXPECT().ExistsPath(VolumePath).Return(true, nil) + mockStatter.EXPECT().IsBlockDevice(VolumePath).Return(false, nil) + mockStatter.EXPECT().StatFS(VolumePath).Return(one, one, one, one, one, one, nil) - expErrCode := codes.Unimplemented + awsDriver := nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + statter: mockStatter, + inFlight: internal.NewInFlight(), + } - req := &csi.NodeGetVolumeStatsRequest{} - _, err := awsDriver.NodeGetVolumeStats(context.TODO(), req) - if err == nil { - t.Fatalf("Expected error code %d, got nil", expErrCode) - } - srvErr, ok := status.FromError(err) - if !ok { - t.Fatalf("Could not get error status code from error: %v", srvErr) + req := &csi.NodeGetVolumeStatsRequest{ + VolumeId: "vol-test", + VolumePath: VolumePath, + } + _, err := awsDriver.NodeGetVolumeStats(context.TODO(), req) + if err != nil { + t.Fatalf("Expected no error but got err %v", err) + } + }, + }, } - if srvErr.Code() != expErrCode { - t.Fatalf("Expected error code %d, got %d message %s", expErrCode, srvErr.Code(), srvErr.Message()) + + for _, tc := range testCases { + t.Run(tc.name, tc.testFunc) } + } func TestNodeGetCapabilities(t *testing.T) { @@ -1226,6 +1243,13 @@ func TestNodeGetCapabilities(t *testing.T) { }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, + }, + }, + }, } expResp := &csi.NodeGetCapabilitiesResponse{Capabilities: caps} diff --git a/pkg/driver/sanity_test.go b/pkg/driver/sanity_test.go index 8afb81dc63..9b1cee083a 100644 --- a/pkg/driver/sanity_test.go +++ b/pkg/driver/sanity_test.go @@ -26,14 +26,16 @@ func TestSanity(t *testing.T) { } defer os.RemoveAll(dir) - targetPath := filepath.Join(dir, "target") + targetPath := filepath.Join(dir, "mount") stagingPath := filepath.Join(dir, "staging") endpoint := "unix://" + filepath.Join(dir, "csi.sock") config := &sanity.Config{ - TargetPath: targetPath, - StagingPath: stagingPath, - Address: endpoint, + TargetPath: targetPath, + StagingPath: stagingPath, + Address: endpoint, + CreateTargetDir: createDir, + CreateStagingDir: createDir, } driverOptions := &DriverOptions{ @@ -54,6 +56,7 @@ func TestSanity(t *testing.T) { AvailabilityZone: "az", }, mounter: newFakeMounter(), + statter: NewFakeStatter(), inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, }, @@ -73,6 +76,15 @@ func TestSanity(t *testing.T) { sanity.Test(t, config) } +func createDir(targetPath string) (string, error) { + if err := os.MkdirAll(targetPath, 0300); err != nil { + if os.IsNotExist(err) { + return "", err + } + } + return targetPath, nil +} + type fakeCloudProvider struct { disks map[string]*fakeDisk // snapshots contains mapping from snapshot ID to snapshot @@ -309,13 +321,49 @@ func (f *fakeMounter) GetDeviceName(mountPath string) (string, int, error) { } func (f *fakeMounter) MakeFile(pathname string) error { + file, err := os.OpenFile(pathname, os.O_CREATE, os.FileMode(0644)) + if err != nil { + if !os.IsExist(err) { + return err + } + } + if err = file.Close(); err != nil { + return err + } return nil } func (f *fakeMounter) MakeDir(pathname string) error { + err := os.MkdirAll(pathname, os.FileMode(0755)) + if err != nil { + if !os.IsExist(err) { + return err + } + } return nil } func (f *fakeMounter) ExistsPath(filename string) (bool, error) { + if _, err := os.Stat(filename); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } return true, nil } + +type fakeStatter struct{} + +func NewFakeStatter() fakeStatter { + return fakeStatter{} +} + +func (fakeStatter) StatFS(path string) (available, capacity, used, inodesFree, inodes, inodesUsed int64, err error) { + // Assume the file exists and give some dummy values back + one := int64(1) + return one, one, one, one, one, one, nil +} + +func (fakeStatter) IsBlockDevice(fullPath string) (bool, error) { + return false, nil +} diff --git a/pkg/driver/statter.go b/pkg/driver/statter.go new file mode 100644 index 0000000000..8482033e27 --- /dev/null +++ b/pkg/driver/statter.go @@ -0,0 +1,74 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "fmt" + "golang.org/x/sys/unix" + "runtime" +) + +type Statter interface { + StatFS(path string) (int64, int64, int64, int64, int64, int64, error) + IsBlockDevice(string) (bool, error) +} + +var _ Statter = statter{} + +type statter struct {} + +func NewStatter() statter { + return statter{} +} + +// IsBlock checks if the given path is a block device +func (statter) IsBlockDevice(fullPath string) (bool, error) { + if runtime.GOOS == "windows" { + return false, nil + } + var st unix.Stat_t + err := unix.Stat(fullPath, &st) + if err != nil { + return false, err + } + + return (st.Mode & unix.S_IFMT) == unix.S_IFBLK, nil +} + +func (statter) StatFS(path string) (available, capacity, used, inodesFree, inodes, inodesUsed int64, err error) { + if runtime.GOOS == "windows" { + zero := int64(0) + return zero, zero, zero, zero, zero, zero, fmt.Errorf("Not implemented") + } + statfs := &unix.Statfs_t{} + err = unix.Statfs(path, statfs) + if err != nil { + err = fmt.Errorf("failed to get fs info on path %s: %v", path, err) + return + } + + // Available is blocks available * fragment size (aka block size), unit: BYTES + available = int64(statfs.Bavail) * int64(statfs.Bsize) + // Capacity is total block count * fragment size, unit: BYTES + capacity = int64(statfs.Blocks) * int64(statfs.Bsize) + // Used is block being used * fragment size, unit: BYTES + used = (int64(statfs.Blocks) - int64(statfs.Bfree)) * int64(statfs.Bsize) + // inodes is total file nodes in filesystem, unit: INODES + inodes = int64(statfs.Files) + // inodesFree is free nodes available, unit: INODES + inodesFree = int64(statfs.Ffree) + // inodesUsed is used file nodes, unit: INODES + inodesUsed = inodes - inodesFree + return +} diff --git a/pkg/driver/statter_test.go b/pkg/driver/statter_test.go new file mode 100644 index 0000000000..8d50e212db --- /dev/null +++ b/pkg/driver/statter_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestIsBlockingDevice(t *testing.T) { + // Setup the full driver and its environment + dir, err := ioutil.TempDir("", "blocking-device-ebs-csi") + if err != nil { + t.Fatalf("error creating directory %v", err) + } + defer os.RemoveAll(dir) + + var ( + testStatter = NewStatter() + ) + + _, err = testStatter.IsBlockDevice(dir) + if err != nil { + t.Fatalf("Expect no error but got: %v", err) + } +} + +func TestStatFS(t *testing.T) { + // Setup the full driver and its environment + dir, err := ioutil.TempDir("", "blocking-stat-ebs-csi") + if err != nil { + t.Fatalf("error creating directory %v", err) + } + defer os.RemoveAll(dir) + + var ( + testStatter = NewStatter() + ) + _, _, _, _, _, _, err = testStatter.StatFS(dir) + if err != nil { + t.Fatalf("Expect no error but got: %v", err) + } +}