diff --git a/docs/modify-volume.md b/docs/modify-volume.md index d3515f6776..168bdc4458 100644 --- a/docs/modify-volume.md +++ b/docs/modify-volume.md @@ -20,7 +20,6 @@ Users can specify the following PVC annotations: ## Considerations - Keep in mind the [6 hour cooldown period](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_ModifyVolume.html) for EBS ModifyVolume. Multiple ModifyVolume calls for the same volume within a 6 hour period will fail. -- It is not yet possible to update both the annotations and capacity of the PVC at the same time. This results in multiple RPC calls to the driver, and only one of them will succeed (due to the cooldown period). A future release of the driver will lift this restriction. - Ensure that the desired volume properties are permissible. The driver does minimum client side validation. ## Example diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index a6798369b0..5fdf7af351 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -467,35 +467,68 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone, SnapshotID: snapshotID, OutpostArn: outpostArn}, nil } -func (c *cloud) ModifyDisk(ctx context.Context, volumeID string, options *ModifyDiskOptions) error { - klog.V(4).InfoS("Received ModifyDisk request", "volumeID", volumeID, "options", options) +// ResizeOrModifyDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit, and/or modifies an EBS +// volume with the parameters in ModifyDiskOptions. +// The resizing operation is performed only when newSizeBytes != 0. +// It returns the volume size after this call or an error if the size couldn't be determined or the volume couldn't be modified. +func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) { + if newSizeBytes != 0 { + klog.V(4).InfoS("Received Resize and/or Modify Disk request", "volumeID", volumeID, "newSizeBytes", newSizeBytes, "options", options) + } else { + klog.V(4).InfoS("Received Modify Disk request", "volumeID", volumeID, "options", options) + } + + newSizeGiB := util.RoundUpGiB(newSizeBytes) + var oldSizeGiB int64 + if newSizeBytes != 0 { + volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB) + if err != nil || volumeSize != 0 { + return volumeSize, err + } + } + req := &ec2.ModifyVolumeInput{ VolumeId: aws.String(volumeID), } - + // Only set req.size for resizing volume + if newSizeBytes != 0 { + req.Size = aws.Int64(newSizeGiB) + } if options.IOPS != 0 { req.Iops = aws.Int64(int64(options.IOPS)) } if options.VolumeType != "" { req.VolumeType = aws.String(options.VolumeType) } - if options.Throughput != 0 { + if options.VolumeType == VolumeTypeGP3 { req.Throughput = aws.Int64(int64(options.Throughput)) } - res, err := c.ec2.ModifyVolumeWithContext(ctx, req) + response, err := c.ec2.ModifyVolumeWithContext(ctx, req) if err != nil { - return fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err) + return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err) } - mod := res.VolumeModification + mod := response.VolumeModification state := aws.StringValue(mod.ModificationState) if volumeModificationDone(state) { - return nil + if newSizeBytes != 0 { + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } else { + return 0, nil + } } - return c.waitForVolumeSize(ctx, volumeID) + err = c.waitForVolumeSize(ctx, volumeID) + if newSizeBytes != 0 { + if err != nil { + return oldSizeGiB, err + } + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } else { + return 0, c.waitForVolumeSize(ctx, volumeID) + } } func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { @@ -1134,78 +1167,6 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool { return isAWSError(err, "IdempotentParameterMismatch") } -// ResizeDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit. -// It returns the volume size after this call or an error if the size couldn't be determined. -func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes int64) (int64, error) { - request := &ec2.DescribeVolumesInput{ - VolumeIds: []*string{ - aws.String(volumeID), - }, - } - volume, err := c.getVolume(ctx, request) - if err != nil { - return 0, err - } - - // AWS resizes in chunks of GiB (not GB) - newSizeGiB := util.RoundUpGiB(newSizeBytes) - oldSizeGiB := aws.Int64Value(volume.Size) - - latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID) - - if latestMod != nil && modFetchError == nil { - state := aws.StringValue(latestMod.ModificationState) - if state == ec2.VolumeModificationStateModifying { - err = c.waitForVolumeSize(ctx, volumeID) - if err != nil { - return oldSizeGiB, err - } - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) - } - } - - // if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error - // that means we have an API problem. - if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) { - return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError) - } - - // Even if existing volume size is greater than user requested size, we should ensure that there are no pending - // volume modifications objects or volume has completed previously issued modification request. - if oldSizeGiB >= newSizeGiB { - klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB) - err = c.waitForVolumeSize(ctx, volumeID) - if err != nil && !errors.Is(err, VolumeNotBeingModified) { - return oldSizeGiB, err - } - return oldSizeGiB, nil - } - - req := &ec2.ModifyVolumeInput{ - VolumeId: aws.String(volumeID), - Size: aws.Int64(newSizeGiB), - } - - klog.V(4).InfoS("expanding volume", "volumeID", volumeID, "newSizeGiB", newSizeGiB) - response, err := c.ec2.ModifyVolumeWithContext(ctx, req) - if err != nil { - return 0, fmt.Errorf("could not modify AWS volume %q: %w", volumeID, err) - } - - mod := response.VolumeModification - - state := aws.StringValue(mod.ModificationState) - if volumeModificationDone(state) { - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) - } - - err = c.waitForVolumeSize(ctx, volumeID) - if err != nil { - return oldSizeGiB, err - } - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) -} - // Checks for desired size on volume by also verifying volume size by describing volume. // This is to get around potential eventual consistency problems with describing volume modifications // objects and ensuring that we read two different objects to verify volume state. @@ -1310,6 +1271,51 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err return zones, nil } +func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) { + request := &ec2.DescribeVolumesInput{ + VolumeIds: []*string{ + aws.String(volumeID), + }, + } + volume, err := c.getVolume(ctx, request) + if err != nil { + return 0, err + } + + oldSizeGiB := aws.Int64Value(volume.Size) + + latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID) + + if latestMod != nil && modFetchError == nil { + state := aws.StringValue(latestMod.ModificationState) + if state == ec2.VolumeModificationStateModifying { + err = c.waitForVolumeSize(ctx, volumeID) + if err != nil { + return oldSizeGiB, err + } + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } + } + + // if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error + // that means we have an API problem. + if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) { + return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError) + } + + // Even if existing volume size is greater than user requested size, we should ensure that there are no pending + // volume modifications objects or volume has completed previously issued modification request. + if oldSizeGiB >= newSizeGiB { + klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB) + err = c.waitForVolumeSize(ctx, volumeID) + if err != nil && !errors.Is(err, VolumeNotBeingModified) { + return oldSizeGiB, err + } + return oldSizeGiB, nil + } + return 0, nil +} + func volumeModificationDone(state string) bool { if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing { return true diff --git a/pkg/cloud/cloud_interface.go b/pkg/cloud/cloud_interface.go index 72852819ac..fad846d086 100644 --- a/pkg/cloud/cloud_interface.go +++ b/pkg/cloud/cloud_interface.go @@ -8,11 +8,10 @@ import ( type Cloud interface { CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (disk *Disk, err error) - ModifyDisk(ctx context.Context, volumeName string, modifyDiskOptions *ModifyDiskOptions) error DeleteDisk(ctx context.Context, volumeID string) (success bool, err error) AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error) DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error) - ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error) + ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (newSize int64, err error) WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error) GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error) diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 3b305625d0..e4361749cd 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -710,64 +710,6 @@ func TestDeleteDisk(t *testing.T) { } } -func TestModifyDisk(t *testing.T) { - testCases := []struct { - name string - volumeID string - modifyDiskOptions *ModifyDiskOptions - modifiedVolume *ec2.ModifyVolumeOutput - modifiedVolumeError awserr.Error - }{ - { - name: "success: IOPS with GP3", - volumeID: "vol-test-1234", - modifyDiskOptions: &ModifyDiskOptions{ - VolumeType: "GP3", - IOPS: 3000, - }, - modifiedVolume: &ec2.ModifyVolumeOutput{ - VolumeModification: &ec2.VolumeModification{ - VolumeId: aws.String("vol-test"), - TargetIops: aws.Int64(3000), - ModificationState: aws.String(ec2.VolumeModificationStateCompleted), - }, - }, - }, - { - name: "fail: ModifyVolume returned generic error", - volumeID: "vol-test-1234", - modifiedVolumeError: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil), - modifyDiskOptions: &ModifyDiskOptions{ - VolumeType: "GP2", - IOPS: 3000, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - mockCtrl := gomock.NewController(t) - mockEC2 := NewMockEC2API(mockCtrl) - c := newCloud(mockEC2) - ctx := context.Background() - if tc.modifiedVolumeError != nil { - mockEC2.EXPECT().ModifyVolumeWithContext(gomock.Any(), gomock.Any()).Return(nil, tc.modifiedVolumeError).AnyTimes() - } else { - mockEC2.EXPECT().ModifyVolumeWithContext(gomock.Any(), gomock.Any()).Return(tc.modifiedVolume, nil).AnyTimes() - } - err := c.ModifyDisk(ctx, tc.volumeID, tc.modifyDiskOptions) - if err != nil && tc.modifiedVolumeError == nil { - t.Fatalf("ModifyDisk() failed: expected no error, got: %v", err) - } - - if err == nil && tc.modifiedVolumeError != nil { - t.Fatal("ModifyDisk() failed: expected error, got nothing") - } - mockCtrl.Finish() - }) - } -} - func TestAttachDisk(t *testing.T) { testCases := []struct { name string @@ -1387,7 +1329,7 @@ func TestDeleteSnapshot(t *testing.T) { } } -func TestResizeDisk(t *testing.T) { +func TestResizeOrModifyDisk(t *testing.T) { testCases := []struct { name string volumeID string @@ -1397,10 +1339,11 @@ func TestResizeDisk(t *testing.T) { modifiedVolumeError awserr.Error descModVolume *ec2.DescribeVolumesModificationsOutput reqSizeGiB int64 + modifyDiskOptions *ModifyDiskOptions expErr error }{ { - name: "success: normal", + name: "success: normal resize", volumeID: "vol-test", existingVolume: &ec2.Volume{ VolumeId: aws.String("vol-test"), @@ -1414,8 +1357,9 @@ func TestResizeDisk(t *testing.T) { ModificationState: aws.String(ec2.VolumeModificationStateCompleted), }, }, - reqSizeGiB: 2, - expErr: nil, + reqSizeGiB: 2, + modifyDiskOptions: &ModifyDiskOptions{}, + expErr: nil, }, { name: "success: normal modifying state", @@ -1441,8 +1385,9 @@ func TestResizeDisk(t *testing.T) { }, }, }, - reqSizeGiB: 2, - expErr: nil, + reqSizeGiB: 2, + modifyDiskOptions: &ModifyDiskOptions{}, + expErr: nil, }, { name: "success: with previous expansion", @@ -1461,8 +1406,54 @@ func TestResizeDisk(t *testing.T) { }, }, }, + reqSizeGiB: 2, + modifyDiskOptions: &ModifyDiskOptions{}, + expErr: nil, + }, + { + name: "success: modify IOPS, throughput and volume type", + volumeID: "vol-test", + modifyDiskOptions: &ModifyDiskOptions{ + VolumeType: "GP3", + IOPS: 3000, + Throughput: 1000, + }, + modifiedVolume: &ec2.ModifyVolumeOutput{ + VolumeModification: &ec2.VolumeModification{ + VolumeId: aws.String("vol-test"), + TargetVolumeType: aws.String("GP3"), + TargetIops: aws.Int64(3000), + TargetThroughput: aws.Int64(1000), + ModificationState: aws.String(ec2.VolumeModificationStateCompleted), + }, + }, + expErr: nil, + }, + { + name: "success: modify size, IOPS, throughput and volume type", + volumeID: "vol-test", + existingVolume: &ec2.Volume{ + VolumeId: aws.String("vol-test"), + Size: aws.Int64(1), + AvailabilityZone: aws.String(defaultZone), + }, + modifyDiskOptions: &ModifyDiskOptions{ + VolumeType: "GP3", + IOPS: 3000, + Throughput: 1000, + }, reqSizeGiB: 2, - expErr: nil, + modifiedVolume: &ec2.ModifyVolumeOutput{ + VolumeModification: &ec2.VolumeModification{ + VolumeId: aws.String("vol-test"), + TargetSize: aws.Int64(2), + TargetVolumeType: aws.String("GP3"), + TargetIops: aws.Int64(3000), + TargetThroughput: aws.Int64(1000), + ModificationState: aws.String(ec2.VolumeModificationStateCompleted), + }, + }, + expErr: nil, }, { name: "fail: volume doesn't exist", @@ -1491,6 +1482,16 @@ func TestResizeDisk(t *testing.T) { reqSizeGiB: 2, expErr: fmt.Errorf("ResizeDisk generic error"), }, + { + name: "failure: ModifyVolume returned generic error", + volumeID: "vol-test", + modifyDiskOptions: &ModifyDiskOptions{ + VolumeType: "GP2", + IOPS: 3000, + }, + modifiedVolumeError: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil), + expErr: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil), + }, } for _, tc := range testCases { @@ -1534,17 +1535,17 @@ func TestResizeDisk(t *testing.T) { mockEC2.EXPECT().DescribeVolumesModificationsWithContext(gomock.Any(), gomock.Any()).Return(emptyOutput, nil).AnyTimes() } - newSize, err := c.ResizeDisk(ctx, tc.volumeID, util.GiBToBytes(tc.reqSizeGiB)) + newSize, err := c.ResizeOrModifyDisk(ctx, tc.volumeID, util.GiBToBytes(tc.reqSizeGiB), tc.modifyDiskOptions) if err != nil { if tc.expErr == nil { - t.Fatalf("ResizeDisk() failed: expected no error, got: %v", err) + t.Fatalf("ResizeOrModifyDisk() failed: expected no error, got: %v", err) } } else { if tc.expErr != nil { - t.Fatal("ResizeDisk() failed: expected error, got nothing") + t.Fatal("ResizeOrModifyDisk() failed: expected error, got nothing") } else { if tc.reqSizeGiB != newSize { - t.Fatalf("ResizeDisk() failed: expected capacity %d, got %d", tc.reqSizeGiB, newSize) + t.Fatalf("ResizeOrModifyDisk() failed: expected capacity %d, got %d", tc.reqSizeGiB, newSize) } } } diff --git a/pkg/cloud/mock_cloud.go b/pkg/cloud/mock_cloud.go index 9acb9376db..0f88ec04a4 100644 --- a/pkg/cloud/mock_cloud.go +++ b/pkg/cloud/mock_cloud.go @@ -243,33 +243,19 @@ func (mr *MockCloudMockRecorder) ListSnapshots(ctx, volumeID, maxResults, nextTo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSnapshots", reflect.TypeOf((*MockCloud)(nil).ListSnapshots), ctx, volumeID, maxResults, nextToken) } -// ModifyDisk mocks base method. -func (m *MockCloud) ModifyDisk(ctx context.Context, volumeName string, modifyDiskOptions *ModifyDiskOptions) error { +// ResizeOrModifyDisk mocks base method. +func (m *MockCloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ModifyDisk", ctx, volumeName, modifyDiskOptions) - ret0, _ := ret[0].(error) - return ret0 -} - -// ModifyDisk indicates an expected call of ModifyDisk. -func (mr *MockCloudMockRecorder) ModifyDisk(ctx, volumeName, modifyDiskOptions interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyDisk", reflect.TypeOf((*MockCloud)(nil).ModifyDisk), ctx, volumeName, modifyDiskOptions) -} - -// ResizeDisk mocks base method. -func (m *MockCloud) ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (int64, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ResizeDisk", ctx, volumeID, reqSize) + ret := m.ctrl.Call(m, "ResizeOrModifyDisk", ctx, volumeID, newSizeBytes, options) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } -// ResizeDisk indicates an expected call of ResizeDisk. -func (mr *MockCloudMockRecorder) ResizeDisk(ctx, volumeID, reqSize interface{}) *gomock.Call { +// ResizeOrModifyDisk indicates an expected call of ResizeOrModifyDisk. +func (mr *MockCloudMockRecorder) ResizeOrModifyDisk(ctx, volumeID, newSizeBytes, options interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResizeDisk", reflect.TypeOf((*MockCloud)(nil).ResizeDisk), ctx, volumeID, reqSize) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResizeOrModifyDisk", reflect.TypeOf((*MockCloud)(nil).ResizeOrModifyDisk), ctx, volumeID, newSizeBytes, options) } // WaitForAttachmentState mocks base method. diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 7104a83c38..3022d564be 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -61,9 +61,10 @@ const isManagedByDriver = "true" // controllerService represents the controller service of CSI driver type controllerService struct { - cloud cloud.Cloud - inFlight *internal.InFlight - driverOptions *DriverOptions + cloud cloud.Cloud + inFlight *internal.InFlight + driverOptions *DriverOptions + modifyVolumeManager *modifyVolumeManager rpc.UnimplementedModifyServer } @@ -97,9 +98,10 @@ func newControllerService(driverOptions *DriverOptions) controllerService { } return controllerService{ - cloud: cloudSrv, - inFlight: internal.NewInFlight(), - driverOptions: driverOptions, + cloud: cloudSrv, + inFlight: internal.NewInFlight(), + driverOptions: driverOptions, + modifyVolumeManager: newModifyVolumeManager(), } } @@ -508,9 +510,26 @@ func (d *controllerService) ControllerExpandVolume(ctx context.Context, req *csi return nil, status.Error(codes.InvalidArgument, "After round-up, volume size exceeds the limit specified") } - actualSizeGiB, err := d.cloud.ResizeDisk(ctx, volumeID, newSize) - if err != nil { - return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err) + responseChan := make(chan modifyVolumeResponse) + modifyVolumeRequest := modifyVolumeRequest{ + newSize: newSize, + responseChan: responseChan, + } + + // Intentionally not pass in context as we deal with context locally in this method + d.addModifyVolumeRequest(volumeID, &modifyVolumeRequest) //nolint:contextcheck + + var actualSizeGiB int64 + + select { + case response := <-responseChan: + if response.err != nil { + return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, response.err) + } else { + actualSizeGiB = response.volumeSize + } + case <-ctx.Done(): + return nil, status.Errorf(codes.Internal, "Could not resize volume %q: context cancelled", volumeID) } nodeExpansionRequired := true diff --git a/pkg/driver/controller_modify_volume.go b/pkg/driver/controller_modify_volume.go index ba2db7f561..93ba89abe5 100644 --- a/pkg/driver/controller_modify_volume.go +++ b/pkg/driver/controller_modify_volume.go @@ -2,7 +2,10 @@ package driver import ( "context" + "fmt" "strconv" + "sync" + "time" "github.com/awslabs/volume-modifier-for-k8s/pkg/rpc" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" @@ -17,8 +20,159 @@ const ( ModificationKeyIOPS = "iops" ModificationKeyThroughput = "throughput" + + modifyVolumeRequestHandlerTimeout = 2 * time.Second ) +type modifyVolumeRequest struct { + newSize int64 + modifyDiskOptions cloud.ModifyDiskOptions + // Channel for sending the response to the request caller + responseChan chan modifyVolumeResponse +} + +type modifyVolumeResponse struct { + volumeSize int64 + err error +} + +type modifyVolumeRequestHandler struct { + volumeID string + // Merged request from the requests that have been accepted for the volume + mergedRequest *modifyVolumeRequest + // Channel for sending requests to the goroutine for the volume + requestChan chan *modifyVolumeRequest +} + +type modifyVolumeManager struct { + // Map of volume ID to modifyVolumeRequestHandler + requestHandlerMap sync.Map +} + +func newModifyVolumeManager() *modifyVolumeManager { + return &modifyVolumeManager{ + requestHandlerMap: sync.Map{}, + } +} + +func newModifyVolumeRequestHandler(volumeID string, request *modifyVolumeRequest) modifyVolumeRequestHandler { + requestChan := make(chan *modifyVolumeRequest) + return modifyVolumeRequestHandler{ + requestChan: requestChan, + mergedRequest: request, + volumeID: volumeID, + } +} + +// This function validates the new request against the merged request for the volume. +// If the new request has a volume property that's already included in the merged request and its value is different from that in the merged request, +// this function will return an error and the new request will be rejected. +func (h *modifyVolumeRequestHandler) validateModifyVolumeRequest(r *modifyVolumeRequest) error { + if r.newSize != 0 && h.mergedRequest.newSize != 0 && r.newSize != h.mergedRequest.newSize { + return fmt.Errorf("Different size was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.newSize, r.newSize) + } + if r.modifyDiskOptions.IOPS != 0 && h.mergedRequest.modifyDiskOptions.IOPS != 0 && r.modifyDiskOptions.IOPS != h.mergedRequest.modifyDiskOptions.IOPS { + return fmt.Errorf("Different IOPS was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.modifyDiskOptions.IOPS, r.modifyDiskOptions.IOPS) + } + if r.modifyDiskOptions.Throughput != 0 && h.mergedRequest.modifyDiskOptions.Throughput != 0 && r.modifyDiskOptions.Throughput != h.mergedRequest.modifyDiskOptions.Throughput { + return fmt.Errorf("Different throughput was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.modifyDiskOptions.Throughput, r.modifyDiskOptions.Throughput) + } + if r.modifyDiskOptions.VolumeType != "" && h.mergedRequest.modifyDiskOptions.VolumeType != "" && r.modifyDiskOptions.VolumeType != h.mergedRequest.modifyDiskOptions.VolumeType { + return fmt.Errorf("Different volume type was requested by a previous request. Current: %s, Requested: %s", h.mergedRequest.modifyDiskOptions.VolumeType, r.modifyDiskOptions.VolumeType) + } + return nil +} + +func (h *modifyVolumeRequestHandler) mergeModifyVolumeRequest(r *modifyVolumeRequest) { + if r.newSize != 0 { + h.mergedRequest.newSize = r.newSize + } + if r.modifyDiskOptions.IOPS != 0 { + h.mergedRequest.modifyDiskOptions.IOPS = r.modifyDiskOptions.IOPS + } + if r.modifyDiskOptions.Throughput != 0 { + h.mergedRequest.modifyDiskOptions.Throughput = r.modifyDiskOptions.Throughput + } + if r.modifyDiskOptions.VolumeType != "" { + h.mergedRequest.modifyDiskOptions.VolumeType = r.modifyDiskOptions.VolumeType + } +} + +// processModifyVolumeRequests method starts its execution with a timer that has modifyVolumeRequestHandlerTimeout as its timeout value. +// When the Timer times out, it calls the ec2 API to perform the volume modification. processModifyVolumeRequests method sends back the response of +// the ec2 API call to the CSI Driver main thread via response channels. +// This method receives requests from CSI driver main thread via the request channel. When a new request is received from the request channel, we first +// validate the new request. If the new request is acceptable, it will be merged with the existing request for the volume. +func (d *controllerService) processModifyVolumeRequests(h *modifyVolumeRequestHandler, responseChans []chan modifyVolumeResponse) { + klog.V(4).InfoS("Start processing ModifyVolumeRequest for ", "volume ID", h.volumeID) + process := func(req *modifyVolumeRequest) { + if err := h.validateModifyVolumeRequest(req); err != nil { + req.responseChan <- modifyVolumeResponse{err: err} + } else { + h.mergeModifyVolumeRequest(req) + responseChans = append(responseChans, req.responseChan) + } + } + + for { + select { + case req := <-h.requestChan: + process(req) + case <-time.After(modifyVolumeRequestHandlerTimeout): + d.modifyVolumeManager.requestHandlerMap.Delete(h.volumeID) + // At this point, no new requests can come in on the request channel because it has been removed from the map + // However, the request channel may still have requests waiting on it + // Thus, process any requests still waiting in the channel + for loop := true; loop; { + select { + case req := <-h.requestChan: + process(req) + default: + loop = false + } + } + actualSizeGiB, err := d.executeModifyVolumeRequest(h.volumeID, h.mergedRequest) + for _, c := range responseChans { + select { + case c <- modifyVolumeResponse{volumeSize: actualSizeGiB, err: err}: + default: + klog.V(6).InfoS("Ignoring response channel because it has no receiver", "volumeID", h.volumeID) + } + } + return + } + } +} + +// When a new request comes in, we look up requestHandlerMap using the volume ID of the request. +// If there's no ModifyVolumeRequestHandler for the volume, meaning that there’s no inflight requests for the volume, we will start a goroutine +// for the volume calling processModifyVolumeRequests method, and ModifyVolumeRequestHandler for the volume will be added to requestHandlerMap. +// If there’s ModifyVolumeRequestHandler for the volume, meaning that there is inflight request(s) for the volume, we will send the new request +// to the goroutine for the volume via the receiving channel. +// Note that each volume with inflight requests has their own goroutine which follows timeout schedule of their own. +func (d *controllerService) addModifyVolumeRequest(volumeID string, r *modifyVolumeRequest) { + requestHandler := newModifyVolumeRequestHandler(volumeID, r) + handler, loaded := d.modifyVolumeManager.requestHandlerMap.LoadOrStore(volumeID, requestHandler) + if loaded { + h := handler.(modifyVolumeRequestHandler) + h.requestChan <- r + } else { + responseChans := []chan modifyVolumeResponse{r.responseChan} + go d.processModifyVolumeRequests(&requestHandler, responseChans) + } +} + +func (d *controllerService) executeModifyVolumeRequest(volumeID string, req *modifyVolumeRequest) (int64, error) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + actualSizeGiB, err := d.cloud.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions) + if err != nil { + return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volumeID, err) + } else { + return actualSizeGiB, nil + } +} + func (d *controllerService) GetCSIDriverModificationCapability( _ context.Context, _ *rpc.GetCSIDriverModificationCapabilityRequest, @@ -30,8 +184,8 @@ func (d *controllerService) ModifyVolumeProperties( ctx context.Context, req *rpc.ModifyVolumePropertiesRequest, ) (*rpc.ModifyVolumePropertiesResponse, error) { - klog.V(4).InfoS("ModifyVolumeAttributes called", "req", req) - if err := validateModifyVolumeAttributesRequest(req); err != nil { + klog.V(4).InfoS("ModifyVolumeProperties called", "req", req) + if err := validateModifyVolumePropertiesRequest(req); err != nil { return nil, err } @@ -55,13 +209,29 @@ func (d *controllerService) ModifyVolumeProperties( modifyOptions.VolumeType = value } } - if err := d.cloud.ModifyDisk(ctx, name, &modifyOptions); err != nil { - return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, err) + + responseChan := make(chan modifyVolumeResponse) + request := modifyVolumeRequest{ + modifyDiskOptions: modifyOptions, + responseChan: responseChan, + } + + // Intentionally not pass in context as we deal with context locally in this method + d.addModifyVolumeRequest(name, &request) //nolint:contextcheck + + select { + case response := <-responseChan: + if response.err != nil { + return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, response.err) + } + case <-ctx.Done(): + return nil, status.Errorf(codes.Internal, "Could not modify volume %q: context cancelled", name) } + return &rpc.ModifyVolumePropertiesResponse{}, nil } -func validateModifyVolumeAttributesRequest(req *rpc.ModifyVolumePropertiesRequest) error { +func validateModifyVolumePropertiesRequest(req *rpc.ModifyVolumePropertiesRequest) error { name := req.GetName() if name == "" { return status.Error(codes.InvalidArgument, "Volume name not provided") diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index 8759d257ac..3d29682b8f 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -3418,12 +3418,13 @@ func TestControllerExpandVolume(t *testing.T) { } mockCloud := cloud.NewMockCloud(mockCtl) - mockCloud.EXPECT().ResizeDisk(gomock.Eq(ctx), gomock.Eq(tc.req.VolumeId), gomock.Any()).Return(retSizeGiB, nil).AnyTimes() + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(tc.req.VolumeId), gomock.Any(), gomock.Any()).Return(retSizeGiB, nil).AnyTimes() awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), } resp, err := awsDriver.ControllerExpandVolume(ctx, tc.req) diff --git a/pkg/driver/request_coalescing_test.go b/pkg/driver/request_coalescing_test.go new file mode 100644 index 0000000000..ec7cfeaf2b --- /dev/null +++ b/pkg/driver/request_coalescing_test.go @@ -0,0 +1,500 @@ +package driver + +import ( + "context" + "fmt" + // "errors" + "sync" + "testing" + "time" + + "github.com/awslabs/volume-modifier-for-k8s/pkg/rpc" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" + "k8s.io/klog/v2" + + "github.com/golang/mock/gomock" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" +) + +// TestBasicRequestCoalescingSuccess tests the success case of coalescing 2 requests from ControllerExpandVolume and ModifyVolumeProperties respectively. +func TestBasicRequestCoalescingSuccess(t *testing.T) { + const NewVolumeType = "gp3" + const NewSize = 5 * util.GiB + volumeID := t.Name() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(volumeID), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + klog.InfoS("ResizeOrModifyDisk called", "volumeID", volumeID, "newSize", newSize, "options", options) + if newSize != NewSize { + t.Errorf("newSize incorrect") + } else if options.VolumeType != NewVolumeType { + t.Errorf("VolumeType incorrect") + } + + return newSize, nil + }) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), + } + + var wg sync.WaitGroup + wg.Add(2) + + go wrapTimeout(t, "ControllerExpandVolume timed out", func() { + _, err := awsDriver.ControllerExpandVolume(context.Background(), &csi.ControllerExpandVolumeRequest{ + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{ + RequiredBytes: NewSize, + }, + }) + + if err != nil { + t.Error("ControllerExpandVolume returned error") + } + wg.Done() + }) + go wrapTimeout(t, "ModifyVolumeProperties timed out", func() { + _, err := awsDriver.ModifyVolumeProperties(context.Background(), &rpc.ModifyVolumePropertiesRequest{ + Name: volumeID, + Parameters: map[string]string{ + ModificationKeyVolumeType: NewVolumeType, + }, + }) + + if err != nil { + t.Error("ModifyVolumeProperties returned error") + } + wg.Done() + }) + + wg.Wait() +} + +// TestRequestFail tests failing requests from ResizeOrModifyDisk failure. +func TestRequestFail(t *testing.T) { + const NewVolumeType = "gp3" + const NewSize = 5 * util.GiB + volumeID := t.Name() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(volumeID), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + klog.InfoS("ResizeOrModifyDisk called", "volumeID", volumeID, "newSize", newSize, "options", options) + return 0, fmt.Errorf("ResizeOrModifyDisk failed") + }) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), + } + + var wg sync.WaitGroup + wg.Add(2) + + go wrapTimeout(t, "ControllerExpandVolume timed out", func() { + _, err := awsDriver.ControllerExpandVolume(context.Background(), &csi.ControllerExpandVolumeRequest{ + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{ + RequiredBytes: NewSize, + }, + }) + + if err == nil { + t.Error("ControllerExpandVolume should fail") + } + wg.Done() + }) + go wrapTimeout(t, "ModifyVolumeProperties timed out", func() { + _, err := awsDriver.ModifyVolumeProperties(context.Background(), &rpc.ModifyVolumePropertiesRequest{ + Name: volumeID, + Parameters: map[string]string{ + ModificationKeyVolumeType: NewVolumeType, + }, + }) + + if err == nil { + t.Error("ModifyVolumeProperties should fail") + } + wg.Done() + }) + + wg.Wait() +} + +// TestPartialFail tests making these 3 requests roughly in parallel: +// 1) Change size +// 2) Change volume type to NewVolumeType1 +// 3) Change volume type to NewVolumeType2 +// The expected result is the resizing request succeeds and one of the volume-type requests fails. +func TestPartialFail(t *testing.T) { + const NewVolumeType1 = "gp3" + const NewVolumeType2 = "io2" + const NewSize = 5 * util.GiB + volumeID := t.Name() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + volumeTypeChosen := "" + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(volumeID), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + klog.InfoS("ResizeOrModifyDisk called", "volumeID", volumeID, "newSize", newSize, "options", options) + if newSize != NewSize { + t.Errorf("newSize incorrect") + } else if options.VolumeType == "" { + t.Errorf("no volume type") + } + + volumeTypeChosen = options.VolumeType + return newSize, nil + }) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), + } + + var wg sync.WaitGroup + wg.Add(3) + + volumeType1Err, volumeType2Error := false, false + + go wrapTimeout(t, "ControllerExpandVolume timed out", func() { + _, err := awsDriver.ControllerExpandVolume(context.Background(), &csi.ControllerExpandVolumeRequest{ + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{ + RequiredBytes: NewSize, + }, + }) + + if err != nil { + t.Error("ControllerExpandVolume returned error") + } + wg.Done() + }) + go wrapTimeout(t, "ModifyVolumeProperties timed out", func() { + _, err := awsDriver.ModifyVolumeProperties(context.Background(), &rpc.ModifyVolumePropertiesRequest{ + Name: volumeID, + Parameters: map[string]string{ + ModificationKeyVolumeType: NewVolumeType1, // gp3 + }, + }) + volumeType1Err = err != nil + wg.Done() + }) + go wrapTimeout(t, "ModifyVolumeProperties timed out", func() { + _, err := awsDriver.ModifyVolumeProperties(context.Background(), &rpc.ModifyVolumePropertiesRequest{ + Name: volumeID, + Parameters: map[string]string{ + ModificationKeyVolumeType: NewVolumeType2, // io2 + }, + }) + if err != nil { + klog.InfoS("Got err io2") + } + volumeType2Error = err != nil + wg.Done() + }) + + wg.Wait() + + if volumeTypeChosen == NewVolumeType1 { + if volumeType1Err { + t.Error("Controller chose", NewVolumeType1, "but errored request") + } + if !volumeType2Error { + t.Error("Controller chose", NewVolumeType1, "but returned success to", NewVolumeType2, "request") + } + } else if volumeTypeChosen == NewVolumeType2 { + if volumeType2Error { + t.Error("Controller chose", NewVolumeType2, "but errored request") + } + if !volumeType1Err { + t.Error("Controller chose", NewVolumeType2, "but returned success to", NewVolumeType1, "request") + } + } else { + t.Error("No volume type chosen") + } +} + +// TestSequential tests sending 2 requests sequentially. +func TestSequentialRequests(t *testing.T) { + const NewVolumeType = "gp3" + const NewSize = 5 * util.GiB + volumeID := t.Name() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(volumeID), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + klog.InfoS("ResizeOrModifyDisk", "volumeID", volumeID, "newSize", newSize, "options", options) + return newSize, nil + }).Times(2) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), + } + + var wg sync.WaitGroup + wg.Add(2) + + go wrapTimeout(t, "ControllerExpandVolume timed out", func() { + _, err := awsDriver.ControllerExpandVolume(context.Background(), &csi.ControllerExpandVolumeRequest{ + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{ + RequiredBytes: NewSize, + }, + }) + + if err != nil { + t.Error("ControllerExpandVolume returned error") + } + wg.Done() + }) + + // We expect ModifyVolume to be called by the end of this sleep + time.Sleep(5 * time.Second) + + go wrapTimeout(t, "ModifyVolumeProperties timed out", func() { + _, err := awsDriver.ModifyVolumeProperties(context.Background(), &rpc.ModifyVolumePropertiesRequest{ + Name: volumeID, + Parameters: map[string]string{ + ModificationKeyVolumeType: NewVolumeType, + }, + }) + + if err != nil { + t.Error("ModifyVolumeProperties returned error") + } + wg.Done() + }) + + wg.Wait() +} + +// TestDuplicateRequest tests sending multiple same requests roughly in parallel. +func TestDuplicateRequest(t *testing.T) { + const NewSize = 5 * util.GiB + volumeID := t.Name() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(volumeID), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + klog.InfoS("ResizeOrModifyDisk called", "volumeID", volumeID, "newSize", newSize, "options", options) + return newSize, nil + }) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), + } + + var wg sync.WaitGroup + num := 5 + wg.Add(num * 2) + + for j := 0; j < num; j++ { + go wrapTimeout(t, "ControllerExpandVolume timed out", func() { + _, err := awsDriver.ControllerExpandVolume(context.Background(), &csi.ControllerExpandVolumeRequest{ + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{ + RequiredBytes: NewSize, + }, + }) + if err != nil { + t.Error("Duplicate ControllerExpandVolume request should succeed") + } + wg.Done() + }) + go wrapTimeout(t, "ModifyVolumeProperties timed out", func() { + _, err := awsDriver.ModifyVolumeProperties(context.Background(), &rpc.ModifyVolumePropertiesRequest{ + Name: volumeID, + Parameters: map[string]string{ + ModificationKeyVolumeType: "io2", + }, + }) + if err != nil { + t.Error("Duplicate ModifyVolumeProperties request should succeed") + } + wg.Done() + }) + } + + wg.Wait() +} + +// TestContextTimeout tests request failing due to context cancellation and the behavior of the following request. +func TestContextTimeout(t *testing.T) { + const NewVolumeType = "gp3" + const NewSize = 5 * util.GiB + volumeID := t.Name() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(volumeID), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + klog.InfoS("ResizeOrModifyDisk called", "volumeID", volumeID, "newSize", newSize, "options", options) + time.Sleep(3 * time.Second) + + // Controller could decide to coalesce the timed out request, or to drop it + if newSize != 0 && newSize != NewSize { + t.Errorf("newSize incorrect") + } else if options.VolumeType != NewVolumeType { + t.Errorf("volumeType incorrect") + } + + return newSize, nil + }) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), + } + + var wg sync.WaitGroup + wg.Add(2) + + ctx, cancel := context.WithCancel(context.Background()) + go wrapTimeout(t, "ControllerExpandVolume timed out", func() { + _, err := awsDriver.ControllerExpandVolume(ctx, &csi.ControllerExpandVolumeRequest{ + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{ + RequiredBytes: NewSize, + }, + }) + if err == nil { + t.Error("ControllerExpandVolume should return err because context is cancelled") + } + wg.Done() + }) + + // Cancel the context (simulate a "sidecar timeout") + time.Sleep(500 * time.Millisecond) + cancel() + + go wrapTimeout(t, "ModifyVolumeProperties timed out", func() { + _, err := awsDriver.ModifyVolumeProperties(context.Background(), &rpc.ModifyVolumePropertiesRequest{ + Name: volumeID, + Parameters: map[string]string{ + ModificationKeyVolumeType: NewVolumeType, + }, + }) + + if err != nil { + t.Error("ModifyVolumeProperties returned error") + } + wg.Done() + }) + + wg.Wait() +} + +// TestResponseReturnTiming tests the caller of request coalescing blocking until receiving response from cloud.ResizeOrModifyDisk +func TestResponseReturnTiming(t *testing.T) { + const NewVolumeType = "gp3" + const NewSize = 5 * util.GiB + var ec2ModifyVolumeFinished = false + volumeID := t.Name() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Any(), gomock.Eq(volumeID), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + klog.InfoS("ResizeOrModifyDisk called", "volumeID", volumeID, "newSize", newSize, "options", options) + + // Sleep to simulate ec2.ModifyVolume taking a long time + time.Sleep(5 * time.Second) + ec2ModifyVolumeFinished = true + + return newSize, nil + }) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), + } + + var wg sync.WaitGroup + wg.Add(2) + + go wrapTimeout(t, "ControllerExpandVolume timed out", func() { + _, err := awsDriver.ControllerExpandVolume(context.Background(), &csi.ControllerExpandVolumeRequest{ + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{ + RequiredBytes: NewSize, + }, + }) + + if !ec2ModifyVolumeFinished { + t.Error("ControllerExpandVolume returned success BEFORE ResizeOrModifyDisk returns") + } + if err != nil { + t.Error("ControllerExpandVolume returned error") + } + wg.Done() + }) + go wrapTimeout(t, "ModifyVolumeProperties timed out", func() { + _, err := awsDriver.ModifyVolumeProperties(context.Background(), &rpc.ModifyVolumePropertiesRequest{ + Name: volumeID, + Parameters: map[string]string{ + ModificationKeyVolumeType: NewVolumeType, + }, + }) + + if !ec2ModifyVolumeFinished { + t.Error("ModifyVolumeProperties returned success BEFORE ResizeOrModifyDisk returns") + } + if err != nil { + t.Error("ModifyVolumeProperties returned error") + } + + wg.Done() + }) + + wg.Wait() +} + +func wrapTimeout(t *testing.T, failMessage string, execFunc func()) { + timeout := time.After(15 * time.Second) + done := make(chan bool) + go func() { + execFunc() + done <- true + }() + + select { + case <-timeout: + t.Error(failMessage) + case <-done: + } +} diff --git a/pkg/driver/sanity_test.go b/pkg/driver/sanity_test.go index bcf1a520ad..a6a3aeaeff 100644 --- a/pkg/driver/sanity_test.go +++ b/pkg/driver/sanity_test.go @@ -150,17 +150,6 @@ func (c *fakeCloudProvider) CreateDisk(ctx context.Context, volumeName string, d return d.Disk, nil } -func (c *fakeCloudProvider) ModifyDisk(ctx context.Context, volumeID string, options *cloud.ModifyDiskOptions) error { - if d, ok := c.disks[volumeID]; !ok { - return cloud.ErrNotFound - } else { - d.iops = options.IOPS - d.throughput = options.Throughput - d.volumeType = options.VolumeType - } - return nil -} - func (c *fakeCloudProvider) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { for volName, f := range c.disks { if f.Disk.VolumeID == volumeID { @@ -309,14 +298,16 @@ func (c *fakeCloudProvider) EnableFastSnapshotRestores(ctx context.Context, avai return nil, nil } -func (c *fakeCloudProvider) ResizeDisk(ctx context.Context, volumeID string, newSize int64) (int64, error) { - for volName, f := range c.disks { - if f.Disk.VolumeID == volumeID { - c.disks[volName].CapacityGiB = newSize - return newSize, nil - } +func (c *fakeCloudProvider) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + if d, ok := c.disks[volumeID]; !ok { + return 0, cloud.ErrNotFound + } else { + d.iops = options.IOPS + d.throughput = options.Throughput + d.volumeType = options.VolumeType + d.Disk.CapacityGiB = newSize + return newSize, nil } - return 0, cloud.ErrNotFound } type fakeMounter struct {