Skip to content

Commit

Permalink
Merge pull request #303 from digitalocean/enable-and-support-testnode…
Browse files Browse the repository at this point in the history
…volumeattachlimit

Enable tests for node volume attachment limits
  • Loading branch information
timoreimann committed Apr 24, 2020
2 parents f17d28a + 98c12a8 commit e702cab
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## unreleased

* Handle per-node volume limit exceeding error during ControllerPublishVolume
[[GH-303]](https://github.com/digitalocean/csi-digitalocean/pull/303)
* Build using Go 1.14
[[GH-302]](https://github.com/digitalocean/csi-digitalocean/pull/302)
* Fix ListSnapshots paging
Expand Down
8 changes: 8 additions & 0 deletions driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ const (
// doAPITimeout sets the timeout we will use when communicating with the
// Digital Ocean API. NOTE: some queries inherit the context timeout
doAPITimeout = 10 * time.Second

// maxVolumesPerDropletErrorMessage is the error message returned by the DO
// API when the per-droplet volume limit would be exceeded.
maxVolumesPerDropletErrorMessage = "cannot attach more than 7 volumes to a single Droplet"
)

var (
Expand Down Expand Up @@ -362,6 +366,10 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
// sending an abort makes sure the csi-attacher retries with the next backoff tick
return nil, status.Errorf(codes.Aborted, "cannot attach because droplet %d has pending action for volume %q", dropletID, req.VolumeId)
}

if strings.Contains(err.Error(), maxVolumesPerDropletErrorMessage) {
return nil, status.Errorf(codes.ResourceExhausted, err.Error())
}
}
return nil, err
}
Expand Down
67 changes: 65 additions & 2 deletions driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func TestDriverSuite(t *testing.T) {

cfg := sanity.NewTestConfig()
cfg.Address = endpoint
cfg.IdempotentCount = 5
cfg.TestNodeVolumeAttachLimit = true
sanity.Test(t, cfg)

cancel()
Expand Down Expand Up @@ -248,11 +250,72 @@ type fakeStorageActionsDriver struct {
}

func (f *fakeStorageActionsDriver) Attach(ctx context.Context, volumeID string, dropletID int) (*godo.Action, *godo.Response, error) {
return nil, godoResponse(), nil
resp := godoResponse()

if _, ok := f.volumes[volumeID]; !ok {
resp.Response = &http.Response{
StatusCode: http.StatusNotFound,
}
return nil, resp, errors.New("volume was not found")
}

droplet, ok := f.droplets[dropletID]
if !ok {
resp.Response = &http.Response{
StatusCode: http.StatusNotFound,
}
return nil, resp, errors.New("droplet was not found")
}

if len(droplet.VolumeIDs) >= maxVolumesPerNode {
resp.Response = &http.Response{
StatusCode: http.StatusUnprocessableEntity,
}
return nil, resp, errors.New(maxVolumesPerDropletErrorMessage)
}
droplet.VolumeIDs = append(droplet.VolumeIDs, volumeID)

return nil, resp, nil
}

func (f *fakeStorageActionsDriver) DetachByDropletID(ctx context.Context, volumeID string, dropletID int) (*godo.Action, *godo.Response, error) {
return nil, godoResponse(), nil
resp := godoResponse()

if _, ok := f.volumes[volumeID]; !ok {
resp.Response = &http.Response{
StatusCode: http.StatusNotFound,
}
return nil, resp, errors.New("volume was not found")
}

droplet, ok := f.droplets[dropletID]
if !ok {
resp.Response = &http.Response{
StatusCode: http.StatusNotFound,
}
return nil, resp, errors.New("droplet was not found")
}

var found bool
var updatedAttachedVolIDs []string
for _, attachedVolID := range droplet.VolumeIDs {
if attachedVolID == volumeID {
found = true
} else {
updatedAttachedVolIDs = append(updatedAttachedVolIDs, attachedVolID)
}
}

if !found {
resp.Response = &http.Response{
StatusCode: http.StatusNotFound,
}
return nil, resp, errors.New("volume is not attached to droplet")
}

droplet.VolumeIDs = updatedAttachedVolIDs

return nil, resp, nil
}

func (f *fakeStorageActionsDriver) Get(ctx context.Context, volumeID string, actionID int) (*godo.Action, *godo.Response, error) {
Expand Down

0 comments on commit e702cab

Please sign in to comment.