Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request coalescing for resizing and modifying volume #1676

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/modify-volume.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Users can specify the following PVC annotations:
## Considerations

- Keep in mind the [6 hour cooldown period](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_ModifyVolume.html) for EBS ModifyVolume. Multiple ModifyVolume calls for the same volume within a 6 hour period will fail.
- It is not yet possible to update both the annotations and capacity of the PVC at the same time. This results in multiple RPC calls to the driver, and only one of them will succeed (due to the cooldown period). A future release of the driver will lift this restriction.
- Ensure that the desired volume properties are permissible. The driver does minimum client side validation.

## Example
Expand Down
168 changes: 87 additions & 81 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,35 +467,68 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone, SnapshotID: snapshotID, OutpostArn: outpostArn}, nil
}

func (c *cloud) ModifyDisk(ctx context.Context, volumeID string, options *ModifyDiskOptions) error {
klog.V(4).InfoS("Received ModifyDisk request", "volumeID", volumeID, "options", options)
// ResizeOrModifyDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit, and/or modifies an EBS
// volume with the parameters in ModifyDiskOptions.
// The resizing operation is performed only when newSizeBytes != 0.
// It returns the volume size after this call or an error if the size couldn't be determined or the volume couldn't be modified.
func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) {
torredil marked this conversation as resolved.
Show resolved Hide resolved
if newSizeBytes != 0 {
klog.V(4).InfoS("Received Resize and/or Modify Disk request", "volumeID", volumeID, "newSizeBytes", newSizeBytes, "options", options)
} else {
klog.V(4).InfoS("Received Modify Disk request", "volumeID", volumeID, "options", options)
}

newSizeGiB := util.RoundUpGiB(newSizeBytes)
var oldSizeGiB int64
if newSizeBytes != 0 {
volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB)
if err != nil || volumeSize != 0 {
return volumeSize, err
}
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}

// Only set req.size for resizing volume
if newSizeBytes != 0 {
req.Size = aws.Int64(newSizeGiB)
}
if options.IOPS != 0 {
req.Iops = aws.Int64(int64(options.IOPS))
}
if options.VolumeType != "" {
req.VolumeType = aws.String(options.VolumeType)
}
if options.Throughput != 0 {
if options.VolumeType == VolumeTypeGP3 {
req.Throughput = aws.Int64(int64(options.Throughput))
}

res, err := c.ec2.ModifyVolumeWithContext(ctx, req)
response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
return fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
}

mod := res.VolumeModification
mod := response.VolumeModification
state := aws.StringValue(mod.ModificationState)

if volumeModificationDone(state) {
return nil
if newSizeBytes != 0 {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, nil
}
}

return c.waitForVolumeSize(ctx, volumeID)
err = c.waitForVolumeSize(ctx, volumeID)
if newSizeBytes != 0 {
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, c.waitForVolumeSize(ctx, volumeID)
}
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
Expand Down Expand Up @@ -1134,78 +1167,6 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool {
return isAWSError(err, "IdempotentParameterMismatch")
}

// ResizeDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit.
// It returns the volume size after this call or an error if the size couldn't be determined.
func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes int64) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

// AWS resizes in chunks of GiB (not GB)
newSizeGiB := util.RoundUpGiB(newSizeBytes)
oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)

if latestMod != nil && modFetchError == nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
}
return oldSizeGiB, nil
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
Size: aws.Int64(newSizeGiB),
}

klog.V(4).InfoS("expanding volume", "volumeID", volumeID, "newSizeGiB", newSizeGiB)
response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
return 0, fmt.Errorf("could not modify AWS volume %q: %w", volumeID, err)
}

mod := response.VolumeModification

state := aws.StringValue(mod.ModificationState)
if volumeModificationDone(state) {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}

err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}

// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
Expand Down Expand Up @@ -1310,6 +1271,51 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err
return zones, nil
}

func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)

if latestMod != nil && modFetchError == nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
}
return oldSizeGiB, nil
}
return 0, nil
}

func volumeModificationDone(state string) bool {
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
return true
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloud/cloud_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (

type Cloud interface {
CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (disk *Disk, err error)
ModifyDisk(ctx context.Context, volumeName string, modifyDiskOptions *ModifyDiskOptions) error
DeleteDisk(ctx context.Context, volumeID string) (success bool, err error)
AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error)
ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error)
ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (newSize int64, err error)
WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error)
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
Expand Down
Loading