Skip to content

Commit

Permalink
Request coalescing for resizing and modifying volume
Browse files Browse the repository at this point in the history
Signed-off-by: Hanyue Liang <hanliang@amazon.com>
  • Loading branch information
hanyuel committed Aug 9, 2023
1 parent 6f2db76 commit 6bd42fb
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 209 deletions.
1 change: 0 additions & 1 deletion docs/modify-volume.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Users can specify the following PVC annotations:
## Considerations

- Keep in mind the [6 hour cooldown period](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_ModifyVolume.html) for EBS ModifyVolume. Multiple ModifyVolume calls for the same volume within a 6 hour period will fail.
- It is not yet possible to update both the annotations and capacity of the PVC at the same time. This results in multiple RPC calls to the driver, and only one of them will succeed (due to the cooldown period). A future release of the driver will lift this restriction.
- Ensure that the desired volume properties are permissible. The driver does minimum client side validation.

## Example
Expand Down
168 changes: 87 additions & 81 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,35 +467,68 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone, SnapshotID: snapshotID, OutpostArn: outpostArn}, nil
}

func (c *cloud) ModifyDisk(ctx context.Context, volumeID string, options *ModifyDiskOptions) error {
klog.V(4).InfoS("Received ModifyDisk request", "volumeID", volumeID, "options", options)
// ResizeOrModifyDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit, and/or modifies an EBS
// volume with the parameters in ModifyDiskOptions.
// The resizing operation is performed only when newSizeBytes != 0.
// It returns the volume size after this call or an error if the size couldn't be determined or the volume couldn't be modified.
func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) {
if newSizeBytes != 0 {
klog.V(4).InfoS("Received Resize and/or Modify Disk request", "volumeID", volumeID, "newSizeBytes", newSizeBytes, "options", options)
} else {
klog.V(4).InfoS("Received Modify Disk request", "volumeID", volumeID, "options", options)
}

newSizeGiB := util.RoundUpGiB(newSizeBytes)
var oldSizeGiB int64
if newSizeBytes != 0 {
volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB)
if err != nil || volumeSize != 0 {
return volumeSize, err
}
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}

// Only set req.size for resizing volume
if newSizeBytes != 0 {
req.Size = aws.Int64(newSizeGiB)
}
if options.IOPS != 0 {
req.Iops = aws.Int64(int64(options.IOPS))
}
if options.VolumeType != "" {
req.VolumeType = aws.String(options.VolumeType)
}
if options.Throughput != 0 {
if options.VolumeType == VolumeTypeGP3 {
req.Throughput = aws.Int64(int64(options.Throughput))
}

res, err := c.ec2.ModifyVolumeWithContext(ctx, req)
response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
return fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
}

mod := res.VolumeModification
mod := response.VolumeModification
state := aws.StringValue(mod.ModificationState)

if volumeModificationDone(state) {
return nil
if newSizeBytes != 0 {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, nil
}
}

return c.waitForVolumeSize(ctx, volumeID)
err = c.waitForVolumeSize(ctx, volumeID)
if newSizeBytes != 0 {
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, c.waitForVolumeSize(ctx, volumeID)
}
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
Expand Down Expand Up @@ -1134,78 +1167,6 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool {
return isAWSError(err, "IdempotentParameterMismatch")
}

// ResizeDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit.
// It returns the volume size after this call or an error if the size couldn't be determined.
func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes int64) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

// AWS resizes in chunks of GiB (not GB)
newSizeGiB := util.RoundUpGiB(newSizeBytes)
oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)

if latestMod != nil && modFetchError == nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
}
return oldSizeGiB, nil
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
Size: aws.Int64(newSizeGiB),
}

klog.V(4).InfoS("expanding volume", "volumeID", volumeID, "newSizeGiB", newSizeGiB)
response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
return 0, fmt.Errorf("could not modify AWS volume %q: %w", volumeID, err)
}

mod := response.VolumeModification

state := aws.StringValue(mod.ModificationState)
if volumeModificationDone(state) {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}

err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}

// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
Expand Down Expand Up @@ -1310,6 +1271,51 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err
return zones, nil
}

func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)

if latestMod != nil && modFetchError == nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
}
return oldSizeGiB, nil
}
return 0, nil
}

func volumeModificationDone(state string) bool {
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
return true
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloud/cloud_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (

type Cloud interface {
CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (disk *Disk, err error)
ModifyDisk(ctx context.Context, volumeName string, modifyDiskOptions *ModifyDiskOptions) error
DeleteDisk(ctx context.Context, volumeID string) (success bool, err error)
AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error)
ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error)
ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (newSize int64, err error)
WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error)
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
Expand Down
Loading

0 comments on commit 6bd42fb

Please sign in to comment.