Skip to content

Commit

Permalink
Tune batched EC2 Describe* maxDelay
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSirenko committed May 1, 2024
1 parent c124663 commit 1282f78
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
20 changes: 12 additions & 8 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ const (
snapshotTagBatcher

batchDescribeTimeout = 30 * time.Second

batchDelay = 500 * time.Millisecond // Tuned via scalability tests to minimize latency and EC2 API Calls
)

var (
Expand Down Expand Up @@ -340,7 +342,7 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
var bm *batcherManager
if batchingEnabled {
klog.V(4).InfoS("newEC2Cloud: batching enabled")
bm = newBatcherManager(svc)
bm = newBatcherManager(svc, batchDelay)
}

rm := newRetryManager()
Expand All @@ -355,24 +357,26 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
}

// newBatcherManager initializes a new instance of batcherManager.
func newBatcherManager(svc EC2API) *batcherManager {
// Each batcher's `entries` set to maximum results returned by relevant EC2 API call without pagination.
// Each batcher's `delay` minimizes RPC latency and EC2 API calls. Tuned via scalability tests.
func newBatcherManager(svc EC2API, delay time.Duration) *batcherManager {
return &batcherManager{
volumeIDBatcher: batcher.New(500, 1*time.Second, func(ids []string) (map[string]*types.Volume, error) {
volumeIDBatcher: batcher.New(500, delay, func(ids []string) (map[string]*types.Volume, error) {
return execBatchDescribeVolumes(svc, ids, volumeIDBatcher)
}),
volumeTagBatcher: batcher.New(500, 1*time.Second, func(names []string) (map[string]*types.Volume, error) {
volumeTagBatcher: batcher.New(500, delay, func(names []string) (map[string]*types.Volume, error) {
return execBatchDescribeVolumes(svc, names, volumeTagBatcher)
}),
instanceIDBatcher: batcher.New(50, 300*time.Millisecond, func(ids []string) (map[string]*types.Instance, error) {
instanceIDBatcher: batcher.New(50, delay, func(ids []string) (map[string]*types.Instance, error) {
return execBatchDescribeInstances(svc, ids)
}),
snapshotIDBatcher: batcher.New(1000, 300*time.Millisecond, func(ids []string) (map[string]*types.Snapshot, error) {
snapshotIDBatcher: batcher.New(1000, delay, func(ids []string) (map[string]*types.Snapshot, error) {
return execBatchDescribeSnapshots(svc, ids, snapshotIDBatcher)
}),
snapshotTagBatcher: batcher.New(1000, 300*time.Millisecond, func(names []string) (map[string]*types.Snapshot, error) {
snapshotTagBatcher: batcher.New(1000, delay, func(names []string) (map[string]*types.Snapshot, error) {
return execBatchDescribeSnapshots(svc, names, snapshotTagBatcher)
}),
volumeModificationIDBatcher: batcher.New(500, 300*time.Millisecond, func(names []string) (map[string]*types.VolumeModification, error) {
volumeModificationIDBatcher: batcher.New(500, delay, func(names []string) (map[string]*types.VolumeModification, error) {
return execBatchDescribeVolumesModifications(svc, names)
}),
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestBatchDescribeVolumes(t *testing.T) {
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2, batchDelay)

tc.mockFunc(mockEC2, tc.expErr, tc.volumes)
volumeIDs, volumeNames := extractVolumeIdentifiers(tc.volumes)
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestBatchDescribeInstances(t *testing.T) {
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2, batchDelay)

// Setup mocks
var instances []types.Instance
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestBatchDescribeSnapshots(t *testing.T) {
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2, batchDelay)

tc.mockFunc(mockEC2, tc.expErr, tc.snapshots)
snapshotIDs, snapshotNames := extractSnapshotIdentifiers(tc.snapshots)
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestBatchDescribeVolumesModifications(t *testing.T) {
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
cloudInstance := c.(*cloud)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2)
cloudInstance.bm = newBatcherManager(cloudInstance.ec2, batchDelay)

// Setup mocks
var volumeModifications []types.VolumeModification
Expand Down

0 comments on commit 1282f78

Please sign in to comment.