diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 41061d6d3dc5..4cc7f9159b16 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -205,7 +205,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { acbw.mu.Lock() defer acbw.mu.Unlock() if len(addrs) <= 0 { - acbw.ac.tearDown(errConnDrain) + acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain) return } if !acbw.ac.tryUpdateAddrs(addrs) { @@ -220,7 +220,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { acbw.ac.acbw = nil acbw.ac.mu.Unlock() acState := acbw.ac.getState() - acbw.ac.tearDown(errConnDrain) + acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain) if acState == connectivity.Shutdown { return diff --git a/clientconn.go b/clientconn.go index 77a08fd33bf8..eab61dce0e05 100644 --- a/clientconn.go +++ b/clientconn.go @@ -143,6 +143,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * firstResolveEvent: grpcsync.NewEvent(), } cc.retryThrottler.Store((*retryThrottler)(nil)) + cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) for _, opt := range opts { @@ -1446,10 +1447,9 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { } // tearDown starts to tear down the addrConn. -// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in -// some edge cases (e.g., the caller opens and closes many addrConn's in a -// tight loop. -// tearDown doesn't remove ac from ac.cc.conns. +// +// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct +// will leak. In most cases, call cc.removeAddrConn() instead. func (ac *addrConn) tearDown(err error) { ac.mu.Lock() if ac.state == connectivity.Shutdown { diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 913ac4ced15c..8b1b9b7a47ef 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -116,7 +116,15 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er // // If the route name did change, then we must wait until the first RDS // update before reporting this LDS config. - w.serviceCb(w.lastUpdate, nil) + if w.lastUpdate.virtualHost != nil { + // We want to send an update with the new fields from the new LDS + // (e.g. max stream duration), and old fields from the the previous + // RDS. + // + // But note that this should only happen when virtual host is set, + // which means an RDS was received. + w.serviceCb(w.lastUpdate, nil) + } return } w.rdsName = update.RouteConfigName diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 36c7416cb436..82355eecc70b 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -1030,6 +1030,48 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { } } +// TestXDSResolverMultipleLDSUpdates tests the case where two LDS updates with +// the same RDS name to watch are received without an RDS in between. Those LDS +// updates shouldn't trigger service config update. +// +// This test case also makes sure the resolver doesn't panic. +func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) { + xdsC := fakeclient.NewClient() + xdsR, tcc, cancel := testSetup(t, setupOpts{ + xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, + }) + defer func() { + cancel() + xdsR.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + defer replaceRandNumGenerator(0)() + + // Send a new LDS update, with the same fields. + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + // Should NOT trigger a state update. + gotState, err := tcc.stateCh.Receive(ctx) + if err == nil { + t.Fatalf("ClientConn.UpdateState received %v, want timeout error", gotState) + } + + // Send a new LDS update, with the same RDS name, but different fields. + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second, HTTPFilters: routerFilterList}, nil) + ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + gotState, err = tcc.stateCh.Receive(ctx) + if err == nil { + t.Fatalf("ClientConn.UpdateState received %v, want timeout error", gotState) + } +} + type filterBuilder struct { httpfilter.Filter // embedded as we do not need to implement registry / parsing in this test. path *[]string