diff --git a/xds/internal/balancer/balancergroup/balancergroup.go b/xds/internal/balancer/balancergroup/balancergroup.go index d646355191c3..2ec576a4b572 100644 --- a/xds/internal/balancer/balancergroup/balancergroup.go +++ b/xds/internal/balancer/balancergroup/balancergroup.go @@ -60,6 +60,8 @@ type subBalancerWrapper struct { // The static part of sub-balancer. Keeps balancerBuilders and addresses. // To be used when restarting sub-balancer. builder balancer.Builder + // Options to be passed to sub-balancer at the time of creation. + buildOpts balancer.BuildOptions // ccState is a cache of the addresses/balancer config, so when the balancer // is restarted after close, it will get the previous update. It's a pointer // and is set to nil at init, so when the balancer is built for the first @@ -94,7 +96,7 @@ func (sbc *subBalancerWrapper) updateBalancerStateWithCachedPicker() { } func (sbc *subBalancerWrapper) startBalancer() { - b := sbc.builder.Build(sbc, balancer.BuildOptions{}) + b := sbc.builder.Build(sbc, sbc.buildOpts) sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name()) sbc.balancer = b if sbc.ccState != nil { @@ -179,6 +181,7 @@ func (sbc *subBalancerWrapper) stopBalancer() { // balancer group. type BalancerGroup struct { cc balancer.ClientConn + buildOpts balancer.BuildOptions logger *grpclog.PrefixLogger loadStore load.PerClusterReporter @@ -235,9 +238,12 @@ var DefaultSubBalancerCloseTimeout = 15 * time.Minute // New creates a new BalancerGroup. Note that the BalancerGroup // needs to be started to work. -func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup { +// +// TODO(easwars): Pass an options struct instead of N args. +func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup { return &BalancerGroup{ cc: cc, + buildOpts: bOpts, logger: logger, loadStore: loadStore, @@ -305,6 +311,7 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { id: id, group: bg, builder: builder, + buildOpts: bg.buildOpts, } if bg.outgoingStarted { // Only start the balancer if bg is started. Otherwise, we only keep the diff --git a/xds/internal/balancer/balancergroup/balancergroup_test.go b/xds/internal/balancer/balancergroup/balancergroup_test.go index 0474e7c722e8..0ad4bf8df10f 100644 --- a/xds/internal/balancer/balancergroup/balancergroup_test.go +++ b/xds/internal/balancer/balancergroup/balancergroup_test.go @@ -38,6 +38,8 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator" "google.golang.org/grpc/xds/internal/client/load" @@ -74,7 +76,7 @@ func newTestBalancerGroup(t *testing.T, loadStore load.PerClusterReporter) (*tes cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() - bg := New(cc, gator, loadStore, nil) + bg := New(cc, balancer.BuildOptions{}, gator, loadStore, nil) bg.Start() return cc, gator, bg } @@ -501,7 +503,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() - bg := New(cc, gator, nil, nil) + bg := New(cc, balancer.BuildOptions{}, gator, nil, nil) // Add two balancers to group and send two resolved addresses to both // balancers. @@ -590,16 +592,20 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { // whenever it gets an address update. It's expected that start() doesn't block // because of deadlock. func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) { + const balancerName = "stub-TestBalancerGroup_start_close_deadlock" + stub.Register(balancerName, stub.BalancerFuncs{}) + builder := balancer.Get(balancerName) + cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() - bg := New(cc, gator, nil, nil) + bg := New(cc, balancer.BuildOptions{}, gator, nil, nil) gator.Add(testBalancerIDs[0], 2) - bg.Add(testBalancerIDs[0], &testutils.TestConstBalancerBuilder{}) + bg.Add(testBalancerIDs[0], builder) bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}}) gator.Add(testBalancerIDs[1], 1) - bg.Add(testBalancerIDs[1], &testutils.TestConstBalancerBuilder{}) + bg.Add(testBalancerIDs[1], builder) bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}) bg.Start() @@ -695,7 +701,7 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() - bg := New(cc, gator, nil, nil) + bg := New(cc, balancer.BuildOptions{}, gator, nil, nil) // Add two balancers to group and send two resolved addresses to both // balancers. @@ -931,3 +937,43 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test t.Fatalf("want %v, got %v", want, err) } } + +// TestBalancerGroupBuildOptions verifies that the balancer.BuildOptions passed +// to the balancergroup at creation time is passed to child policies. +func (s) TestBalancerGroupBuildOptions(t *testing.T) { + const ( + balancerName = "stubBalancer-TestBalancerGroupBuildOptions" + parent = int64(1234) + userAgent = "ua" + defaultTestTimeout = 1 * time.Second + ) + + // Setup the stub balancer such that we can read the build options passed to + // it in the UpdateClientConnState method. + bOpts := balancer.BuildOptions{ + DialCreds: insecure.NewCredentials(), + ChannelzParentID: parent, + CustomUserAgent: userAgent, + } + stub.Register(balancerName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + if !cmp.Equal(bd.BuildOptions, bOpts) { + return fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts) + } + return nil + }, + }) + cc := testutils.NewTestClientConn(t) + bg := New(cc, bOpts, nil, nil, nil) + bg.Start() + + // Add the stub balancer build above as a child policy. + balancerBuilder := balancer.Get(balancerName) + bg.Add(testBalancerIDs[0], balancerBuilder) + + // Send an empty clientConn state change. This should trigger the + // verification of the buildOptions being passed to the child policy. + if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{}); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index da5900ac75cf..1e4dee7f5d3a 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -40,12 +40,12 @@ func init() { type builder struct{} -func (builder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { +func (builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { b := &bal{} b.logger = prefixLogger(b) b.stateAggregator = newBalancerStateAggregator(cc, b.logger) b.stateAggregator.start() - b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger) + b.bg = balancergroup.New(cc, opts, b.stateAggregator, nil, b.logger) b.bg.Start() b.logger.Infof("Created") return b diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index 86c377937f36..a40d954ad64f 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -29,8 +29,11 @@ import ( "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/hierarchy" + itestutils "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/balancergroup" @@ -510,3 +513,55 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { testPick(t, p3, tt.pickInfo, tt.wantSC, tt.wantErr) } } + +func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { + const ( + balancerName = "stubBalancer-TestClusterManagerForwardsBalancerBuildOptions" + parent = int64(1234) + userAgent = "ua" + defaultTestTimeout = 1 * time.Second + ) + + // Setup the stub balancer such that we can read the build options passed to + // it in the UpdateClientConnState method. + ccsCh := itestutils.NewChannel() + bOpts := balancer.BuildOptions{ + DialCreds: insecure.NewCredentials(), + ChannelzParentID: parent, + CustomUserAgent: userAgent, + } + stub.Register(balancerName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + if !cmp.Equal(bd.BuildOptions, bOpts) { + err := fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts) + ccsCh.Send(err) + return err + } + ccsCh.Send(nil) + return nil + }, + }) + + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, bOpts) + + configJSON1 := fmt.Sprintf(`{ +"children": { + "cds:cluster_1":{ "childPolicy": [{"%s":""}] } +} +}`, balancerName) + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + if err := rtb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: config1}); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if v, err := ccsCh.Receive(ctx); err != nil { + err2 := v.(error) + t.Fatal(err2) + } +} diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 12a1251abe9f..a6b37f6277a1 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -48,8 +48,8 @@ type xdsClientInterface interface { } var ( - newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface { - return newEDSBalancerImpl(cc, enqueueState, lw, logger) + newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface { + return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger) } newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() } ) @@ -61,7 +61,7 @@ func init() { type edsBalancerBuilder struct{} // Build helps implement the balancer.Builder interface. -func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { +func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { x := &edsBalancer{ cc: cc, closed: grpcsync.NewEvent(), @@ -80,7 +80,7 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptio } x.xdsClient = client - x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, x.lsw, x.logger) + x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.lsw, x.logger) x.logger.Infof("Created") go x.run() return x diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 8c34e4b9d436..499ea5243b64 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -23,6 +23,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/balancer/roundrobin" @@ -65,6 +66,7 @@ type balancerGroupWithConfig struct { // policy is used to manage endpoints in each locality. type edsBalancerImpl struct { cc balancer.ClientConn + buildOpts balancer.BuildOptions logger *grpclog.PrefixLogger loadReporter load.PerClusterReporter @@ -102,9 +104,10 @@ type edsBalancerImpl struct { } // newEDSBalancerImpl create a new edsBalancerImpl. -func newEDSBalancerImpl(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), lr load.PerClusterReporter, logger *grpclog.PrefixLogger) *edsBalancerImpl { +func newEDSBalancerImpl(cc balancer.ClientConn, bOpts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lr load.PerClusterReporter, logger *grpclog.PrefixLogger) *edsBalancerImpl { edsImpl := &edsBalancerImpl{ cc: cc, + buildOpts: bOpts, logger: logger, subBalancerBuilder: balancer.Get(roundrobin.Name), loadReporter: lr, @@ -248,7 +251,7 @@ func (edsImpl *edsBalancerImpl) handleEDSResponse(edsResp xdsclient.EndpointsUpd ccPriorityWrapper := edsImpl.ccWrapperWithPriority(priority) stateAggregator := weightedaggregator.New(ccPriorityWrapper, edsImpl.logger, newRandomWRR) bgwc = &balancerGroupWithConfig{ - bg: balancergroup.New(ccPriorityWrapper, stateAggregator, edsImpl.loadReporter, edsImpl.logger), + bg: balancergroup.New(ccPriorityWrapper, edsImpl.buildOpts, stateAggregator, edsImpl.loadReporter, edsImpl.logger), stateAggregator: stateAggregator, configs: make(map[internal.LocalityID]*localityConfig), } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go b/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go index 1ce4f36ca73f..7696feb5bd04 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go @@ -35,7 +35,7 @@ import ( // Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0. func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with priorities [0, 1], each with one backend. @@ -101,7 +101,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { // down, use 2; remove 2, use 1. func (s) TestEDSPriority_SwitchPriority(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with priorities [0, 1], each with one backend. @@ -208,7 +208,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) { // Init 0 and 1; 0 and 1 both down; add 2, use 2. func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with different priorities, each with one backend. @@ -271,7 +271,7 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { // Init 0,1,2; 0 and 1 down, use 2; 0 up, close 1 and 2. func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with priorities [0,1,2], each with one backend. @@ -353,7 +353,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) { }()() cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with different priorities, each with one backend. @@ -403,7 +403,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) { // - add localities to existing p0 and p1 func (s) TestEDSPriority_MultipleLocalities(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with different priorities, each with one backend. @@ -514,7 +514,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) { }()() cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with different priorities, each with one backend. @@ -698,7 +698,7 @@ func (s) TestPriorityTypeEqual(t *testing.T) { // will be used. func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with priorities [0, 1], each with one backend. @@ -757,7 +757,7 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) { // priority will be used. func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, with priorities [0, 1], each with one backend. @@ -823,7 +823,7 @@ func (s) TestEDSPriority_FirstPriorityUnavailable(t *testing.T) { defaultPriorityInitTimeout = testPriorityInitTimeout cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // One localities, with priorities [0], each with one backend. diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index b4349a021dca..69ce6dcac1da 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/balancergroup" "google.golang.org/grpc/xds/internal/client" @@ -61,7 +62,7 @@ func init() { // - change drop rate func (s) TestEDS_OneLocality(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // One locality with one backend. @@ -182,7 +183,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { // - update locality weight func (s) TestEDS_TwoLocalities(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, each with one backend. @@ -313,7 +314,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { // healthy ones are used. func (s) TestEDS_EndpointsHealth(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown. @@ -385,7 +386,7 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) { } func (s) TestClose(t *testing.T) { - edsb := newEDSBalancerImpl(nil, nil, nil, nil) + edsb := newEDSBalancerImpl(nil, balancer.BuildOptions{}, nil, nil, nil) // This is what could happen when switching between fallback and eds. This // make sure it doesn't panic. edsb.close() @@ -396,7 +397,7 @@ func (s) TestClose(t *testing.T) { // It should send an error picker with transient failure to the parent. func (s) TestEDS_EmptyUpdate(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // The first update is an empty update. @@ -456,15 +457,33 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { } // Create XDS balancer, and update sub-balancer before handling eds responses. -// Then switch between round-robin and test-const-balancer after handling first +// Then switch between round-robin and a test stub-balancer after handling first // eds response. func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { + const balancerName = "stubBalancer-TestEDS_UpdateSubBalancerName" + cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + stub.Register(balancerName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error { + if len(s.ResolverState.Addresses) == 0 { + return nil + } + bd.ClientConn.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{}) + return nil + }, + UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &testutils.TestConstPicker{Err: testutils.ErrTestConstPicker}, + }) + }, + }) + + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState - t.Logf("update sub-balancer to test-const-balancer") - edsb.handleChildPolicy("test-const-balancer", nil) + t.Logf("update sub-balancer to stub-balancer") + edsb.handleChildPolicy(balancerName, nil) // Two localities, each with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) @@ -506,8 +525,8 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { t.Fatalf("want %v, got %v", want, err) } - t.Logf("update sub-balancer to test-const-balancer") - edsb.handleChildPolicy("test-const-balancer", nil) + t.Logf("update sub-balancer to stub-balancer") + edsb.handleChildPolicy(balancerName, nil) for i := 0; i < 2; i++ { scToRemove := <-cc.RemoveSubConnCh @@ -558,7 +577,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }() cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState edsb.updateServiceRequestsCounter("test") var maxRequests uint32 = 50 @@ -658,7 +677,7 @@ func (*testInlineUpdateBalancer) Close() { // by acquiring a locked mutex. func (s) TestEDS_ChildPolicyUpdatePickerInline(t *testing.T) { cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = func(p priorityType, state balancer.State) { // For this test, euqueue needs to happen asynchronously (like in the // real implementation). @@ -759,7 +778,7 @@ func (s) TestEDS_LoadReport(t *testing.T) { lsWrapper.updateLoadStore(loadStore) cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, lsWrapper, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState const ( @@ -853,7 +872,7 @@ func (s) TestEDS_LoadReportDisabled(t *testing.T) { // Not calling lsWrapper.updateLoadStore(loadStore) because LRS is disabled. cc := testutils.NewTestClientConn(t) - edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil) + edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, lsWrapper, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState // One localities, with one backend. diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 3ee19a00debe..ea5ec39c4568 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -221,7 +221,7 @@ func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) { newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil } origNewEDSBalancer := newEDSBalancer - newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), _ load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface { + newEDSBalancer = func(cc balancer.ClientConn, _ balancer.BuildOptions, _ func(priorityType, balancer.State), _ load.PerClusterReporter, _ *grpclog.PrefixLogger) edsBalancerImplInterface { edsLB := newFakeEDSBalancer(cc) defer func() { edsLBCh.Send(edsLB) }() return edsLB diff --git a/xds/internal/balancer/weightedtarget/weightedtarget.go b/xds/internal/balancer/weightedtarget/weightedtarget.go index b0310660fb29..02b199258cd2 100644 --- a/xds/internal/balancer/weightedtarget/weightedtarget.go +++ b/xds/internal/balancer/weightedtarget/weightedtarget.go @@ -45,12 +45,12 @@ func init() { type weightedTargetBB struct{} -func (wt *weightedTargetBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { +func (wt *weightedTargetBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &weightedTargetBalancer{} b.logger = prefixLogger(b) b.stateAggregator = weightedaggregator.New(cc, b.logger, newRandomWRR) b.stateAggregator.Start() - b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger) + b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, nil, b.logger) b.bg.Start() b.logger.Infof("Created") return b diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go index de34da819639..85de8d584a4f 100644 --- a/xds/internal/client/v3/client.go +++ b/xds/internal/client/v3/client.go @@ -143,7 +143,7 @@ func (v3c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) { return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err) } v3c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl()) - v3c.logger.Debugf("ADS response received: %v", resp) + v3c.logger.Debugf("ADS response received: %+v", resp) return resp, nil } diff --git a/xds/internal/testutils/balancer.go b/xds/internal/testutils/balancer.go index 69e355e353c2..f49ba85606ff 100644 --- a/xds/internal/testutils/balancer.go +++ b/xds/internal/testutils/balancer.go @@ -232,49 +232,9 @@ func (tc *testClosure) next() balancer.SubConn { return ret } -func init() { - balancer.Register(&TestConstBalancerBuilder{}) -} - // ErrTestConstPicker is error returned by test const picker. var ErrTestConstPicker = fmt.Errorf("const picker error") -// TestConstBalancerBuilder is a balancer builder for tests. -type TestConstBalancerBuilder struct{} - -// Build builds a test const balancer. -func (*TestConstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - return &testConstBalancer{cc: cc} -} - -// Name returns test-const-balancer name. -func (*TestConstBalancerBuilder) Name() string { - return "test-const-balancer" -} - -type testConstBalancer struct { - cc balancer.ClientConn -} - -func (tb *testConstBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { - tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &TestConstPicker{Err: ErrTestConstPicker}}) -} - -func (tb *testConstBalancer) ResolverError(error) { - panic("not implemented") -} - -func (tb *testConstBalancer) UpdateClientConnState(s balancer.ClientConnState) error { - if len(s.ResolverState.Addresses) == 0 { - return nil - } - tb.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{}) - return nil -} - -func (*testConstBalancer) Close() { -} - // TestConstPicker is a const picker for tests. type TestConstPicker struct { Err error