From 1282f7856de064febe40e7896da09031840e0ff2 Mon Sep 17 00:00:00 2001 From: Drew Sirenko <68304519+AndrewSirenko@users.noreply.github.com> Date: Wed, 1 May 2024 14:24:19 +0000 Subject: [PATCH] Tune batched EC2 Describe* maxDelay --- pkg/cloud/cloud.go | 20 ++++++++++++-------- pkg/cloud/cloud_test.go | 8 ++++---- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index fa8aea8987..b184c51e01 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -151,6 +151,8 @@ const ( snapshotTagBatcher batchDescribeTimeout = 30 * time.Second + + batchDelay = 500 * time.Millisecond // Tuned via scalability tests to minimize latency and EC2 API Calls ) var ( @@ -340,7 +342,7 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc var bm *batcherManager if batchingEnabled { klog.V(4).InfoS("newEC2Cloud: batching enabled") - bm = newBatcherManager(svc) + bm = newBatcherManager(svc, batchDelay) } rm := newRetryManager() @@ -355,24 +357,26 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc } // newBatcherManager initializes a new instance of batcherManager. -func newBatcherManager(svc EC2API) *batcherManager { +// Each batcher's `entries` set to maximum results returned by relevant EC2 API call without pagination. +// Each batcher's `delay` minimizes RPC latency and EC2 API calls. Tuned via scalability tests. +func newBatcherManager(svc EC2API, delay time.Duration) *batcherManager { return &batcherManager{ - volumeIDBatcher: batcher.New(500, 1*time.Second, func(ids []string) (map[string]*types.Volume, error) { + volumeIDBatcher: batcher.New(500, delay, func(ids []string) (map[string]*types.Volume, error) { return execBatchDescribeVolumes(svc, ids, volumeIDBatcher) }), - volumeTagBatcher: batcher.New(500, 1*time.Second, func(names []string) (map[string]*types.Volume, error) { + volumeTagBatcher: batcher.New(500, delay, func(names []string) (map[string]*types.Volume, error) { return execBatchDescribeVolumes(svc, names, volumeTagBatcher) }), - instanceIDBatcher: batcher.New(50, 300*time.Millisecond, func(ids []string) (map[string]*types.Instance, error) { + instanceIDBatcher: batcher.New(50, delay, func(ids []string) (map[string]*types.Instance, error) { return execBatchDescribeInstances(svc, ids) }), - snapshotIDBatcher: batcher.New(1000, 300*time.Millisecond, func(ids []string) (map[string]*types.Snapshot, error) { + snapshotIDBatcher: batcher.New(1000, delay, func(ids []string) (map[string]*types.Snapshot, error) { return execBatchDescribeSnapshots(svc, ids, snapshotIDBatcher) }), - snapshotTagBatcher: batcher.New(1000, 300*time.Millisecond, func(names []string) (map[string]*types.Snapshot, error) { + snapshotTagBatcher: batcher.New(1000, delay, func(names []string) (map[string]*types.Snapshot, error) { return execBatchDescribeSnapshots(svc, names, snapshotTagBatcher) }), - volumeModificationIDBatcher: batcher.New(500, 300*time.Millisecond, func(names []string) (map[string]*types.VolumeModification, error) { + volumeModificationIDBatcher: batcher.New(500, delay, func(names []string) (map[string]*types.VolumeModification, error) { return execBatchDescribeVolumesModifications(svc, names) }), } diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 65fd1ceea0..e1a6016ac2 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -183,7 +183,7 @@ func TestBatchDescribeVolumes(t *testing.T) { mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) cloudInstance := c.(*cloud) - cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2, batchDelay) tc.mockFunc(mockEC2, tc.expErr, tc.volumes) volumeIDs, volumeNames := extractVolumeIdentifiers(tc.volumes) @@ -300,7 +300,7 @@ func TestBatchDescribeInstances(t *testing.T) { mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) cloudInstance := c.(*cloud) - cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2, batchDelay) // Setup mocks var instances []types.Instance @@ -475,7 +475,7 @@ func TestBatchDescribeSnapshots(t *testing.T) { mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) cloudInstance := c.(*cloud) - cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2, batchDelay) tc.mockFunc(mockEC2, tc.expErr, tc.snapshots) snapshotIDs, snapshotNames := extractSnapshotIdentifiers(tc.snapshots) @@ -594,7 +594,7 @@ func TestBatchDescribeVolumesModifications(t *testing.T) { mockEC2 := NewMockEC2API(mockCtrl) c := newCloud(mockEC2) cloudInstance := c.(*cloud) - cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2, batchDelay) // Setup mocks var volumeModifications []types.VolumeModification