From fdc2ec2c84c8d21fb23ba82d2aeb95cdef8091e8 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 9 Oct 2024 16:57:29 -0700 Subject: [PATCH] xdsclient: deflake TestADS_ResourcesAreRequestedAfterStreamRestart (#7720) --- .../tests/ads_stream_backoff_test.go | 7 ++-- .../tests/ads_stream_restart_test.go | 39 +++++++++++++++++-- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go index c94945321ba5..fe8125048207 100644 --- a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go @@ -434,15 +434,16 @@ func (s) TestADS_ResourceRequestedBeforeStreamCreation(t *testing.T) { func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error { t.Helper() - for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + var lastRequestedNames []string + for ; ; <-time.After(defaultTestShortTimeout) { select { case <-ctx.Done(): + return fmt.Errorf("timeout waiting for resources %v to be requested from the management server. Last requested resources: %v", wantNames, lastRequestedNames) case gotNames := <-namesCh: if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) { return nil } - t.Logf("Received resource names %v, want %v", gotNames, wantNames) + lastRequestedNames = gotNames } } - return fmt.Errorf("timeout waiting for resource to be requested from the management server") } diff --git a/xds/internal/xdsclient/tests/ads_stream_restart_test.go b/xds/internal/xdsclient/tests/ads_stream_restart_test.go index a74a62593f81..f0da932f5fd8 100644 --- a/xds/internal/xdsclient/tests/ads_stream_restart_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_restart_test.go @@ -58,18 +58,24 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ Listener: lis, OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + t.Logf("Received request for resources: %v of type %s", req.GetResourceNames(), req.GetTypeUrl()) + + // Drain the resource name channels before writing to them to ensure + // that the most recently requested names are made available to the + // test. switch req.GetTypeUrl() { case version.V3ClusterURL: select { - case cdsResourcesCh <- req.GetResourceNames(): + case <-cdsResourcesCh: default: } + cdsResourcesCh <- req.GetResourceNames() case version.V3ListenerURL: - t.Logf("Received LDS request for resources: %v", req.GetResourceNames()) select { - case ldsResourcesCh <- req.GetResourceNames(): + case <-ldsResourcesCh: default: } + ldsResourcesCh <- req.GetResourceNames() } return nil }, @@ -130,6 +136,17 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { t.Fatal(err) } + // Verify the update received by the watcher. + wantListenerUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ + RouteConfigName: routeConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + } + if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil { + t.Fatal(err) + } + // Cancel the watch for the above listener resource, and verify that an LDS // request with no resource names is sent. ldsCancel() @@ -171,6 +188,11 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { } defer ldsCancel() + // Verify the update received by the watcher. + if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil { + t.Fatal(err) + } + // Create a cluster resource on the management server, in addition to the // existing listener resource. const clusterName = "cluster" @@ -192,6 +214,17 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { t.Fatal(err) } + // Verify the update received by the watcher. + wantClusterUpdate := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ + ClusterName: clusterName, + EDSServiceName: clusterName, + }, + } + if err := verifyClusterUpdate(ctx, cw.updateCh, wantClusterUpdate); err != nil { + t.Fatal(err) + } + // Cancel the watch for the above cluster resource, and verify that a CDS // request with no resource names is sent. cdsCancel()