diff --git a/docs/modify-volume.md b/docs/modify-volume.md index d3515f6776..168bdc4458 100644 --- a/docs/modify-volume.md +++ b/docs/modify-volume.md @@ -20,7 +20,6 @@ Users can specify the following PVC annotations: ## Considerations - Keep in mind the [6 hour cooldown period](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_ModifyVolume.html) for EBS ModifyVolume. Multiple ModifyVolume calls for the same volume within a 6 hour period will fail. -- It is not yet possible to update both the annotations and capacity of the PVC at the same time. This results in multiple RPC calls to the driver, and only one of them will succeed (due to the cooldown period). A future release of the driver will lift this restriction. - Ensure that the desired volume properties are permissible. The driver does minimum client side validation. ## Example diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index a6798369b0..5fdf7af351 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -467,35 +467,68 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone, SnapshotID: snapshotID, OutpostArn: outpostArn}, nil } -func (c *cloud) ModifyDisk(ctx context.Context, volumeID string, options *ModifyDiskOptions) error { - klog.V(4).InfoS("Received ModifyDisk request", "volumeID", volumeID, "options", options) +// ResizeOrModifyDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit, and/or modifies an EBS +// volume with the parameters in ModifyDiskOptions. +// The resizing operation is performed only when newSizeBytes != 0. +// It returns the volume size after this call or an error if the size couldn't be determined or the volume couldn't be modified. +func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) { + if newSizeBytes != 0 { + klog.V(4).InfoS("Received Resize and/or Modify Disk request", "volumeID", volumeID, "newSizeBytes", newSizeBytes, "options", options) + } else { + klog.V(4).InfoS("Received Modify Disk request", "volumeID", volumeID, "options", options) + } + + newSizeGiB := util.RoundUpGiB(newSizeBytes) + var oldSizeGiB int64 + if newSizeBytes != 0 { + volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB) + if err != nil || volumeSize != 0 { + return volumeSize, err + } + } + req := &ec2.ModifyVolumeInput{ VolumeId: aws.String(volumeID), } - + // Only set req.size for resizing volume + if newSizeBytes != 0 { + req.Size = aws.Int64(newSizeGiB) + } if options.IOPS != 0 { req.Iops = aws.Int64(int64(options.IOPS)) } if options.VolumeType != "" { req.VolumeType = aws.String(options.VolumeType) } - if options.Throughput != 0 { + if options.VolumeType == VolumeTypeGP3 { req.Throughput = aws.Int64(int64(options.Throughput)) } - res, err := c.ec2.ModifyVolumeWithContext(ctx, req) + response, err := c.ec2.ModifyVolumeWithContext(ctx, req) if err != nil { - return fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err) + return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err) } - mod := res.VolumeModification + mod := response.VolumeModification state := aws.StringValue(mod.ModificationState) if volumeModificationDone(state) { - return nil + if newSizeBytes != 0 { + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } else { + return 0, nil + } } - return c.waitForVolumeSize(ctx, volumeID) + err = c.waitForVolumeSize(ctx, volumeID) + if newSizeBytes != 0 { + if err != nil { + return oldSizeGiB, err + } + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } else { + return 0, c.waitForVolumeSize(ctx, volumeID) + } } func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { @@ -1134,78 +1167,6 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool { return isAWSError(err, "IdempotentParameterMismatch") } -// ResizeDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit. -// It returns the volume size after this call or an error if the size couldn't be determined. -func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes int64) (int64, error) { - request := &ec2.DescribeVolumesInput{ - VolumeIds: []*string{ - aws.String(volumeID), - }, - } - volume, err := c.getVolume(ctx, request) - if err != nil { - return 0, err - } - - // AWS resizes in chunks of GiB (not GB) - newSizeGiB := util.RoundUpGiB(newSizeBytes) - oldSizeGiB := aws.Int64Value(volume.Size) - - latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID) - - if latestMod != nil && modFetchError == nil { - state := aws.StringValue(latestMod.ModificationState) - if state == ec2.VolumeModificationStateModifying { - err = c.waitForVolumeSize(ctx, volumeID) - if err != nil { - return oldSizeGiB, err - } - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) - } - } - - // if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error - // that means we have an API problem. - if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) { - return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError) - } - - // Even if existing volume size is greater than user requested size, we should ensure that there are no pending - // volume modifications objects or volume has completed previously issued modification request. - if oldSizeGiB >= newSizeGiB { - klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB) - err = c.waitForVolumeSize(ctx, volumeID) - if err != nil && !errors.Is(err, VolumeNotBeingModified) { - return oldSizeGiB, err - } - return oldSizeGiB, nil - } - - req := &ec2.ModifyVolumeInput{ - VolumeId: aws.String(volumeID), - Size: aws.Int64(newSizeGiB), - } - - klog.V(4).InfoS("expanding volume", "volumeID", volumeID, "newSizeGiB", newSizeGiB) - response, err := c.ec2.ModifyVolumeWithContext(ctx, req) - if err != nil { - return 0, fmt.Errorf("could not modify AWS volume %q: %w", volumeID, err) - } - - mod := response.VolumeModification - - state := aws.StringValue(mod.ModificationState) - if volumeModificationDone(state) { - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) - } - - err = c.waitForVolumeSize(ctx, volumeID) - if err != nil { - return oldSizeGiB, err - } - return c.checkDesiredSize(ctx, volumeID, newSizeGiB) -} - // Checks for desired size on volume by also verifying volume size by describing volume. // This is to get around potential eventual consistency problems with describing volume modifications // objects and ensuring that we read two different objects to verify volume state. @@ -1310,6 +1271,51 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err return zones, nil } +func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) { + request := &ec2.DescribeVolumesInput{ + VolumeIds: []*string{ + aws.String(volumeID), + }, + } + volume, err := c.getVolume(ctx, request) + if err != nil { + return 0, err + } + + oldSizeGiB := aws.Int64Value(volume.Size) + + latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID) + + if latestMod != nil && modFetchError == nil { + state := aws.StringValue(latestMod.ModificationState) + if state == ec2.VolumeModificationStateModifying { + err = c.waitForVolumeSize(ctx, volumeID) + if err != nil { + return oldSizeGiB, err + } + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } + } + + // if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error + // that means we have an API problem. + if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) { + return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError) + } + + // Even if existing volume size is greater than user requested size, we should ensure that there are no pending + // volume modifications objects or volume has completed previously issued modification request. + if oldSizeGiB >= newSizeGiB { + klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB) + err = c.waitForVolumeSize(ctx, volumeID) + if err != nil && !errors.Is(err, VolumeNotBeingModified) { + return oldSizeGiB, err + } + return oldSizeGiB, nil + } + return 0, nil +} + func volumeModificationDone(state string) bool { if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing { return true diff --git a/pkg/cloud/cloud_interface.go b/pkg/cloud/cloud_interface.go index 72852819ac..fad846d086 100644 --- a/pkg/cloud/cloud_interface.go +++ b/pkg/cloud/cloud_interface.go @@ -8,11 +8,10 @@ import ( type Cloud interface { CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (disk *Disk, err error) - ModifyDisk(ctx context.Context, volumeName string, modifyDiskOptions *ModifyDiskOptions) error DeleteDisk(ctx context.Context, volumeID string) (success bool, err error) AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error) DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error) - ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error) + ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (newSize int64, err error) WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error) GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error) diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 3b305625d0..e4361749cd 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -710,64 +710,6 @@ func TestDeleteDisk(t *testing.T) { } } -func TestModifyDisk(t *testing.T) { - testCases := []struct { - name string - volumeID string - modifyDiskOptions *ModifyDiskOptions - modifiedVolume *ec2.ModifyVolumeOutput - modifiedVolumeError awserr.Error - }{ - { - name: "success: IOPS with GP3", - volumeID: "vol-test-1234", - modifyDiskOptions: &ModifyDiskOptions{ - VolumeType: "GP3", - IOPS: 3000, - }, - modifiedVolume: &ec2.ModifyVolumeOutput{ - VolumeModification: &ec2.VolumeModification{ - VolumeId: aws.String("vol-test"), - TargetIops: aws.Int64(3000), - ModificationState: aws.String(ec2.VolumeModificationStateCompleted), - }, - }, - }, - { - name: "fail: ModifyVolume returned generic error", - volumeID: "vol-test-1234", - modifiedVolumeError: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil), - modifyDiskOptions: &ModifyDiskOptions{ - VolumeType: "GP2", - IOPS: 3000, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - mockCtrl := gomock.NewController(t) - mockEC2 := NewMockEC2API(mockCtrl) - c := newCloud(mockEC2) - ctx := context.Background() - if tc.modifiedVolumeError != nil { - mockEC2.EXPECT().ModifyVolumeWithContext(gomock.Any(), gomock.Any()).Return(nil, tc.modifiedVolumeError).AnyTimes() - } else { - mockEC2.EXPECT().ModifyVolumeWithContext(gomock.Any(), gomock.Any()).Return(tc.modifiedVolume, nil).AnyTimes() - } - err := c.ModifyDisk(ctx, tc.volumeID, tc.modifyDiskOptions) - if err != nil && tc.modifiedVolumeError == nil { - t.Fatalf("ModifyDisk() failed: expected no error, got: %v", err) - } - - if err == nil && tc.modifiedVolumeError != nil { - t.Fatal("ModifyDisk() failed: expected error, got nothing") - } - mockCtrl.Finish() - }) - } -} - func TestAttachDisk(t *testing.T) { testCases := []struct { name string @@ -1387,7 +1329,7 @@ func TestDeleteSnapshot(t *testing.T) { } } -func TestResizeDisk(t *testing.T) { +func TestResizeOrModifyDisk(t *testing.T) { testCases := []struct { name string volumeID string @@ -1397,10 +1339,11 @@ func TestResizeDisk(t *testing.T) { modifiedVolumeError awserr.Error descModVolume *ec2.DescribeVolumesModificationsOutput reqSizeGiB int64 + modifyDiskOptions *ModifyDiskOptions expErr error }{ { - name: "success: normal", + name: "success: normal resize", volumeID: "vol-test", existingVolume: &ec2.Volume{ VolumeId: aws.String("vol-test"), @@ -1414,8 +1357,9 @@ func TestResizeDisk(t *testing.T) { ModificationState: aws.String(ec2.VolumeModificationStateCompleted), }, }, - reqSizeGiB: 2, - expErr: nil, + reqSizeGiB: 2, + modifyDiskOptions: &ModifyDiskOptions{}, + expErr: nil, }, { name: "success: normal modifying state", @@ -1441,8 +1385,9 @@ func TestResizeDisk(t *testing.T) { }, }, }, - reqSizeGiB: 2, - expErr: nil, + reqSizeGiB: 2, + modifyDiskOptions: &ModifyDiskOptions{}, + expErr: nil, }, { name: "success: with previous expansion", @@ -1461,8 +1406,54 @@ func TestResizeDisk(t *testing.T) { }, }, }, + reqSizeGiB: 2, + modifyDiskOptions: &ModifyDiskOptions{}, + expErr: nil, + }, + { + name: "success: modify IOPS, throughput and volume type", + volumeID: "vol-test", + modifyDiskOptions: &ModifyDiskOptions{ + VolumeType: "GP3", + IOPS: 3000, + Throughput: 1000, + }, + modifiedVolume: &ec2.ModifyVolumeOutput{ + VolumeModification: &ec2.VolumeModification{ + VolumeId: aws.String("vol-test"), + TargetVolumeType: aws.String("GP3"), + TargetIops: aws.Int64(3000), + TargetThroughput: aws.Int64(1000), + ModificationState: aws.String(ec2.VolumeModificationStateCompleted), + }, + }, + expErr: nil, + }, + { + name: "success: modify size, IOPS, throughput and volume type", + volumeID: "vol-test", + existingVolume: &ec2.Volume{ + VolumeId: aws.String("vol-test"), + Size: aws.Int64(1), + AvailabilityZone: aws.String(defaultZone), + }, + modifyDiskOptions: &ModifyDiskOptions{ + VolumeType: "GP3", + IOPS: 3000, + Throughput: 1000, + }, reqSizeGiB: 2, - expErr: nil, + modifiedVolume: &ec2.ModifyVolumeOutput{ + VolumeModification: &ec2.VolumeModification{ + VolumeId: aws.String("vol-test"), + TargetSize: aws.Int64(2), + TargetVolumeType: aws.String("GP3"), + TargetIops: aws.Int64(3000), + TargetThroughput: aws.Int64(1000), + ModificationState: aws.String(ec2.VolumeModificationStateCompleted), + }, + }, + expErr: nil, }, { name: "fail: volume doesn't exist", @@ -1491,6 +1482,16 @@ func TestResizeDisk(t *testing.T) { reqSizeGiB: 2, expErr: fmt.Errorf("ResizeDisk generic error"), }, + { + name: "failure: ModifyVolume returned generic error", + volumeID: "vol-test", + modifyDiskOptions: &ModifyDiskOptions{ + VolumeType: "GP2", + IOPS: 3000, + }, + modifiedVolumeError: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil), + expErr: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil), + }, } for _, tc := range testCases { @@ -1534,17 +1535,17 @@ func TestResizeDisk(t *testing.T) { mockEC2.EXPECT().DescribeVolumesModificationsWithContext(gomock.Any(), gomock.Any()).Return(emptyOutput, nil).AnyTimes() } - newSize, err := c.ResizeDisk(ctx, tc.volumeID, util.GiBToBytes(tc.reqSizeGiB)) + newSize, err := c.ResizeOrModifyDisk(ctx, tc.volumeID, util.GiBToBytes(tc.reqSizeGiB), tc.modifyDiskOptions) if err != nil { if tc.expErr == nil { - t.Fatalf("ResizeDisk() failed: expected no error, got: %v", err) + t.Fatalf("ResizeOrModifyDisk() failed: expected no error, got: %v", err) } } else { if tc.expErr != nil { - t.Fatal("ResizeDisk() failed: expected error, got nothing") + t.Fatal("ResizeOrModifyDisk() failed: expected error, got nothing") } else { if tc.reqSizeGiB != newSize { - t.Fatalf("ResizeDisk() failed: expected capacity %d, got %d", tc.reqSizeGiB, newSize) + t.Fatalf("ResizeOrModifyDisk() failed: expected capacity %d, got %d", tc.reqSizeGiB, newSize) } } } diff --git a/pkg/cloud/mock_cloud.go b/pkg/cloud/mock_cloud.go index 9acb9376db..0f88ec04a4 100644 --- a/pkg/cloud/mock_cloud.go +++ b/pkg/cloud/mock_cloud.go @@ -243,33 +243,19 @@ func (mr *MockCloudMockRecorder) ListSnapshots(ctx, volumeID, maxResults, nextTo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSnapshots", reflect.TypeOf((*MockCloud)(nil).ListSnapshots), ctx, volumeID, maxResults, nextToken) } -// ModifyDisk mocks base method. -func (m *MockCloud) ModifyDisk(ctx context.Context, volumeName string, modifyDiskOptions *ModifyDiskOptions) error { +// ResizeOrModifyDisk mocks base method. +func (m *MockCloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ModifyDisk", ctx, volumeName, modifyDiskOptions) - ret0, _ := ret[0].(error) - return ret0 -} - -// ModifyDisk indicates an expected call of ModifyDisk. -func (mr *MockCloudMockRecorder) ModifyDisk(ctx, volumeName, modifyDiskOptions interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyDisk", reflect.TypeOf((*MockCloud)(nil).ModifyDisk), ctx, volumeName, modifyDiskOptions) -} - -// ResizeDisk mocks base method. -func (m *MockCloud) ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (int64, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ResizeDisk", ctx, volumeID, reqSize) + ret := m.ctrl.Call(m, "ResizeOrModifyDisk", ctx, volumeID, newSizeBytes, options) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } -// ResizeDisk indicates an expected call of ResizeDisk. -func (mr *MockCloudMockRecorder) ResizeDisk(ctx, volumeID, reqSize interface{}) *gomock.Call { +// ResizeOrModifyDisk indicates an expected call of ResizeOrModifyDisk. +func (mr *MockCloudMockRecorder) ResizeOrModifyDisk(ctx, volumeID, newSizeBytes, options interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResizeDisk", reflect.TypeOf((*MockCloud)(nil).ResizeDisk), ctx, volumeID, reqSize) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResizeOrModifyDisk", reflect.TypeOf((*MockCloud)(nil).ResizeOrModifyDisk), ctx, volumeID, newSizeBytes, options) } // WaitForAttachmentState mocks base method. diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 7104a83c38..bcc7387b37 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -61,9 +61,10 @@ const isManagedByDriver = "true" // controllerService represents the controller service of CSI driver type controllerService struct { - cloud cloud.Cloud - inFlight *internal.InFlight - driverOptions *DriverOptions + cloud cloud.Cloud + inFlight *internal.InFlight + driverOptions *DriverOptions + modifyVolumeManager *modifyVolumeManager rpc.UnimplementedModifyServer } @@ -97,9 +98,10 @@ func newControllerService(driverOptions *DriverOptions) controllerService { } return controllerService{ - cloud: cloudSrv, - inFlight: internal.NewInFlight(), - driverOptions: driverOptions, + cloud: cloudSrv, + inFlight: internal.NewInFlight(), + driverOptions: driverOptions, + modifyVolumeManager: newModifyVolumeManager(), } } @@ -508,9 +510,28 @@ func (d *controllerService) ControllerExpandVolume(ctx context.Context, req *csi return nil, status.Error(codes.InvalidArgument, "After round-up, volume size exceeds the limit specified") } - actualSizeGiB, err := d.cloud.ResizeDisk(ctx, volumeID, newSize) - if err != nil { - return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err) + responseChan := make(chan modifyVolumeResponse) + modifyVolumeRequest := modifyVolumeRequest{ + newSize: newSize, + responseChan: responseChan, + } + + d.addModifyVolumeRequest(ctx, volumeID, &modifyVolumeRequest) + + var actualSizeGiB int64 + + defer d.modifyVolumeManager.deleteRequestHandler(volumeID) + + select { + case response := <-responseChan: + if response.err != nil { + return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, response.err) + } else { + actualSizeGiB = response.volumeSize + break + } + case <-ctx.Done(): + return nil, status.Errorf(codes.Internal, "Could not resize volume. Context is cancelled") } nodeExpansionRequired := true diff --git a/pkg/driver/controller_modify_volume.go b/pkg/driver/controller_modify_volume.go index ba2db7f561..d1f1d61cab 100644 --- a/pkg/driver/controller_modify_volume.go +++ b/pkg/driver/controller_modify_volume.go @@ -2,7 +2,10 @@ package driver import ( "context" + "fmt" "strconv" + "sync" + "time" "github.com/awslabs/volume-modifier-for-k8s/pkg/rpc" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" @@ -17,8 +20,149 @@ const ( ModificationKeyIOPS = "iops" ModificationKeyThroughput = "throughput" + + modifyVolumeRequestHandlerTimeout = 2 * time.Second ) +type modifyVolumeRequest struct { + newSize int64 + modifyDiskOptions cloud.ModifyDiskOptions + responseChan chan modifyVolumeResponse +} + +type modifyVolumeResponse struct { + volumeSize int64 + err error +} + +type modifyVolumeRequestHandler struct { + volumeID string + // Merged request from the requests that have been accepted for the volume + mergedRequest *modifyVolumeRequest + // Channel for sending requests to the Timer thread for the volume + requestChan chan *modifyVolumeRequest + // Channels for sending the response from the ec2 ModifyVolume API call to the CSI Driver main thread + responseChans []chan modifyVolumeResponse +} + +type modifyVolumeManager struct { + // Map of volume ID to modifyVolumeRequestHandler + requestHandlerMap sync.Map +} + +func newModifyVolumeManager() *modifyVolumeManager { + return &modifyVolumeManager{ + requestHandlerMap: sync.Map{}, + } +} + +func (m *modifyVolumeManager) deleteRequestHandler(volumeID string) { + m.requestHandlerMap.Delete(volumeID) +} + +func newModifyVolumeRequestHandler(volumeID string, request *modifyVolumeRequest) modifyVolumeRequestHandler { + requestChan := make(chan *modifyVolumeRequest) + responseChans := []chan modifyVolumeResponse{request.responseChan} + return modifyVolumeRequestHandler{ + requestChan: requestChan, + responseChans: responseChans, + mergedRequest: request, + volumeID: volumeID, + } +} + +// This function validates the new request against the merged request for the volume. +// If the new request has a volume property that's already included in the merged request and its value is different from that in the merged request, +// this function will return an error and the new request will be rejected. +func (h *modifyVolumeRequestHandler) validateModifyVolumeRequest(r *modifyVolumeRequest) error { + if r.newSize != 0 && h.mergedRequest.newSize != 0 && r.newSize != h.mergedRequest.newSize { + return fmt.Errorf("Different size was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.newSize, r.newSize) + } + if r.modifyDiskOptions.IOPS != 0 && h.mergedRequest.modifyDiskOptions.IOPS != 0 && r.modifyDiskOptions.IOPS != h.mergedRequest.modifyDiskOptions.IOPS { + return fmt.Errorf("Different IOPS was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.modifyDiskOptions.IOPS, r.modifyDiskOptions.IOPS) + } + if r.modifyDiskOptions.Throughput != 0 && h.mergedRequest.modifyDiskOptions.Throughput != 0 && r.modifyDiskOptions.Throughput != h.mergedRequest.modifyDiskOptions.Throughput { + return fmt.Errorf("Different throughput was requested by a previous request. Current: %d, Requested: %d", h.mergedRequest.modifyDiskOptions.Throughput, r.modifyDiskOptions.Throughput) + } + if r.modifyDiskOptions.VolumeType != "" && h.mergedRequest.modifyDiskOptions.VolumeType != "" && r.modifyDiskOptions.VolumeType != h.mergedRequest.modifyDiskOptions.VolumeType { + return fmt.Errorf("Different volume type was requested by a previous request. Current: %s, Requested: %s", h.mergedRequest.modifyDiskOptions.VolumeType, r.modifyDiskOptions.VolumeType) + } + return nil +} + +func (h *modifyVolumeRequestHandler) mergeModifyVolumeRequest(r *modifyVolumeRequest) { + if r.newSize != 0 { + h.mergedRequest.newSize = r.newSize + } + if r.modifyDiskOptions.IOPS != 0 { + h.mergedRequest.modifyDiskOptions.IOPS = r.modifyDiskOptions.IOPS + } + if r.modifyDiskOptions.Throughput != 0 { + h.mergedRequest.modifyDiskOptions.Throughput = r.modifyDiskOptions.Throughput + } + if r.modifyDiskOptions.VolumeType != "" { + h.mergedRequest.modifyDiskOptions.VolumeType = r.modifyDiskOptions.VolumeType + } + h.responseChans = append(h.responseChans, r.responseChan) +} + +// processModifyVolumeRequests method starts its execution with a timer that has modifyVolumeRequestHandlerTimeout as its timeout value. +// When the Timer times out, it calls the ec2 API to perform the volume modification. processModifyVolumeRequests method sends back the response of +// the ec2 API call to the CSI Driver main thread via response channels. +// This method receives requests from CSI driver main thread via the request channel. When a new request is received from the request channel, we first +// validate the new request. If the new request is acceptable, it will be merged with the existing request for the volume. +func (h *modifyVolumeRequestHandler) processModifyVolumeRequests(ctx context.Context, d *controllerService) { + klog.V(4).InfoS("Start processing ModifyVolumeRequest for ", "volume ID", h.volumeID) + for { + select { + case req := <-h.requestChan: + if err := h.validateModifyVolumeRequest(req); err != nil { + req.responseChan <- modifyVolumeResponse{err: err} + } else { + h.mergeModifyVolumeRequest(req) + } + case <-time.After(modifyVolumeRequestHandlerTimeout): + actualSizeGiB, err := d.executeModifyVolumeRequest(ctx, h.volumeID, h.mergedRequest) + for _, c := range h.responseChans { + c <- modifyVolumeResponse{volumeSize: actualSizeGiB, err: err} + } + return + } + } +} + +func (h *modifyVolumeRequestHandler) isVolumePropertyRequested() bool { + return h.mergedRequest.modifyDiskOptions.IOPS != 0 || h.mergedRequest.modifyDiskOptions.Throughput != 0 || h.mergedRequest.modifyDiskOptions.VolumeType != "" +} + +// When a new request comes in, we look up requestHandlerMap using the volume ID of the request. +// If there's no ModifyVolumeRequestHandler for the volume, meaning that there’s no inflight requests for the volume, we will start a goroutine +// for the volume calling processModifyVolumeRequests method, and ModifyVolumeRequestHandler for the volume will be added to requestHandlerMap. +// If there’s ModifyVolumeRequestHandler for the volume, meaning that there is inflight request(s) for the volume, we will send the new request +// to the goroutine for the volume via the receiving channel. +// Note that each volume with inflight requests has their own goroutine which follows timeout schedule of their own. +func (d *controllerService) addModifyVolumeRequest(ctx context.Context, volumeID string, r *modifyVolumeRequest) *modifyVolumeRequestHandler { + requestHandler := newModifyVolumeRequestHandler(volumeID, r) + handler, loaded := d.modifyVolumeManager.requestHandlerMap.LoadOrStore(volumeID, requestHandler) + if loaded { + h := handler.(modifyVolumeRequestHandler) + h.requestChan <- r + return &h + } else { + go requestHandler.processModifyVolumeRequests(ctx, d) + return &requestHandler + } +} + +func (d *controllerService) executeModifyVolumeRequest(ctx context.Context, volumeID string, req *modifyVolumeRequest) (int64, error) { + actualSizeGiB, err := d.cloud.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions) + if err != nil { + return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volumeID, err) + } else { + return actualSizeGiB, nil + } +} + func (d *controllerService) GetCSIDriverModificationCapability( _ context.Context, _ *rpc.GetCSIDriverModificationCapabilityRequest, @@ -30,8 +174,8 @@ func (d *controllerService) ModifyVolumeProperties( ctx context.Context, req *rpc.ModifyVolumePropertiesRequest, ) (*rpc.ModifyVolumePropertiesResponse, error) { - klog.V(4).InfoS("ModifyVolumeAttributes called", "req", req) - if err := validateModifyVolumeAttributesRequest(req); err != nil { + klog.V(4).InfoS("ModifyVolumeProperties called", "req", req) + if err := validateModifyVolumePropertiesRequest(req); err != nil { return nil, err } @@ -55,13 +199,31 @@ func (d *controllerService) ModifyVolumeProperties( modifyOptions.VolumeType = value } } - if err := d.cloud.ModifyDisk(ctx, name, &modifyOptions); err != nil { - return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, err) + + responseChan := make(chan modifyVolumeResponse) + request := modifyVolumeRequest{ + modifyDiskOptions: modifyOptions, + responseChan: responseChan, + } + d.addModifyVolumeRequest(ctx, name, &request) + + defer d.modifyVolumeManager.deleteRequestHandler(name) + + select { + case response := <-responseChan: + if response.err != nil { + return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, response.err) + } else { + break + } + case <-ctx.Done(): + return nil, status.Errorf(codes.Internal, "Could not modify volume. Context is cancelled") } + return &rpc.ModifyVolumePropertiesResponse{}, nil } -func validateModifyVolumeAttributesRequest(req *rpc.ModifyVolumePropertiesRequest) error { +func validateModifyVolumePropertiesRequest(req *rpc.ModifyVolumePropertiesRequest) error { name := req.GetName() if name == "" { return status.Error(codes.InvalidArgument, "Volume name not provided") diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index 8759d257ac..0493d9a039 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -3418,12 +3418,13 @@ func TestControllerExpandVolume(t *testing.T) { } mockCloud := cloud.NewMockCloud(mockCtl) - mockCloud.EXPECT().ResizeDisk(gomock.Eq(ctx), gomock.Eq(tc.req.VolumeId), gomock.Any()).Return(retSizeGiB, nil).AnyTimes() + mockCloud.EXPECT().ResizeOrModifyDisk(gomock.Eq(ctx), gomock.Eq(tc.req.VolumeId), gomock.Any(), gomock.Any()).Return(retSizeGiB, nil).AnyTimes() awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + modifyVolumeManager: newModifyVolumeManager(), } resp, err := awsDriver.ControllerExpandVolume(ctx, tc.req) diff --git a/pkg/driver/sanity_test.go b/pkg/driver/sanity_test.go index bcf1a520ad..a6a3aeaeff 100644 --- a/pkg/driver/sanity_test.go +++ b/pkg/driver/sanity_test.go @@ -150,17 +150,6 @@ func (c *fakeCloudProvider) CreateDisk(ctx context.Context, volumeName string, d return d.Disk, nil } -func (c *fakeCloudProvider) ModifyDisk(ctx context.Context, volumeID string, options *cloud.ModifyDiskOptions) error { - if d, ok := c.disks[volumeID]; !ok { - return cloud.ErrNotFound - } else { - d.iops = options.IOPS - d.throughput = options.Throughput - d.volumeType = options.VolumeType - } - return nil -} - func (c *fakeCloudProvider) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { for volName, f := range c.disks { if f.Disk.VolumeID == volumeID { @@ -309,14 +298,16 @@ func (c *fakeCloudProvider) EnableFastSnapshotRestores(ctx context.Context, avai return nil, nil } -func (c *fakeCloudProvider) ResizeDisk(ctx context.Context, volumeID string, newSize int64) (int64, error) { - for volName, f := range c.disks { - if f.Disk.VolumeID == volumeID { - c.disks[volName].CapacityGiB = newSize - return newSize, nil - } +func (c *fakeCloudProvider) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize int64, options *cloud.ModifyDiskOptions) (int64, error) { + if d, ok := c.disks[volumeID]; !ok { + return 0, cloud.ErrNotFound + } else { + d.iops = options.IOPS + d.throughput = options.Throughput + d.volumeType = options.VolumeType + d.Disk.CapacityGiB = newSize + return newSize, nil } - return 0, cloud.ErrNotFound } type fakeMounter struct {