diff --git a/test/pickfirst_leaf_test.go b/test/pickfirst_leaf_test.go index 033ee5bb0b33..4c2627697447 100644 --- a/test/pickfirst_leaf_test.go +++ b/test/pickfirst_leaf_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/pickfirst" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/status" @@ -39,7 +40,7 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) -var pickFirstLeafServiceConfig = fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.PickFirstLeafName) +var stateStoringServiceConfig = fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateStoringBalancerName) const stateStoringBalancerName = "state_storing" @@ -72,7 +73,7 @@ func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption) dopts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(pickFirstLeafServiceConfig), + grpc.WithDefaultServiceConfig(stateStoringServiceConfig), } dopts = append(dopts, opts...) cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...) @@ -92,11 +93,223 @@ func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption) return cc, r, backends } +// TestPickFirstLeaf_ResolverUpdate tests the behaviour of the new pick first +// policy when servers are brought down and resolver updates are received. +func (s) TestPickFirstLeaf_ResolverUpdate(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + balChan := make(chan *stateStoringBalancer, 1) + balancer.Register(&stateStoringBalancerBuilder{balancerChan: balChan}) + tests := []struct { + name string + backendCount int + initialBackendIndexes []int + initialTargetBackendIndex int + wantScStates []connectivity.State + updatedBackendIndexes []int + updatedTargetBackendIndex int + wantScStatesPostUpdate []connectivity.State + restartConnected bool + }{ + { + name: "two_server_first_ready", + backendCount: 2, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 0, + wantScStates: []connectivity.State{connectivity.Ready}, + }, + { + name: "two_server_second_ready", + backendCount: 2, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 1, + wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + }, + { + name: "duplicate_address", + backendCount: 2, + initialBackendIndexes: []int{0, 0, 1}, + initialTargetBackendIndex: 1, + wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + }, + { + name: "disjoint_updated_addresses", + backendCount: 4, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 1, + wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + updatedBackendIndexes: []int{2, 3}, + updatedTargetBackendIndex: 3, + wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Shutdown, connectivity.Shutdown, connectivity.Ready}, + }, + { + name: "active_backend_in_updated_list", + backendCount: 3, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 1, + wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + updatedBackendIndexes: []int{1, 2}, + updatedTargetBackendIndex: 1, + wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + }, + { + name: "inactive_backend_in_updated_list", + backendCount: 3, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 1, + wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + updatedBackendIndexes: []int{0, 2}, + updatedTargetBackendIndex: 0, + wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Shutdown, connectivity.Ready}, + }, + { + name: "identical_list", + backendCount: 2, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 1, + wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + updatedBackendIndexes: []int{0, 1}, + updatedTargetBackendIndex: 1, + wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + }, + { + name: "first_connected_idle_reconnect", + backendCount: 2, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 0, + restartConnected: true, + wantScStates: []connectivity.State{connectivity.Ready}, + updatedBackendIndexes: []int{0, 1}, + updatedTargetBackendIndex: 0, + wantScStatesPostUpdate: []connectivity.State{connectivity.Ready}, + }, + { + name: "second_connected_idle_reconnect", + backendCount: 2, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 1, + restartConnected: true, + wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + updatedBackendIndexes: []int{0, 1}, + updatedTargetBackendIndex: 1, + wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Ready, connectivity.Shutdown}, + }, + { + name: "second_connected_idle_reconnect_first", + backendCount: 2, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 1, + restartConnected: true, + wantScStates: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + updatedBackendIndexes: []int{0, 1}, + updatedTargetBackendIndex: 0, + wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Shutdown, connectivity.Ready}, + }, + { + name: "first_connected_idle_reconnect_second", + backendCount: 2, + initialBackendIndexes: []int{0, 1}, + initialTargetBackendIndex: 0, + restartConnected: true, + wantScStates: []connectivity.State{connectivity.Ready}, + updatedBackendIndexes: []int{0, 1}, + updatedTargetBackendIndex: 1, + wantScStatesPostUpdate: []connectivity.State{connectivity.Shutdown, connectivity.Ready}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cc, r, backends := setupPickFirstLeaf(t, tc.backendCount) + + activeBackends := []*stubserver.StubServer{} + for _, idx := range tc.initialBackendIndexes { + activeBackends = append(activeBackends, backends[idx]) + } + addrs := stubBackendsToResolverAddrs(activeBackends) + r.UpdateState(resolver.State{Addresses: addrs}) + + // shutdown all active backends except the target. + var targetAddr resolver.Address + for idx_i, idx := range tc.initialBackendIndexes { + if idx == tc.initialTargetBackendIndex { + targetAddr = addrs[idx_i] + continue + } + backends[idx].S.Stop() + } + + if err := pickfirst.CheckRPCsToBackend(ctx, cc, targetAddr); err != nil { + t.Fatal(err) + } + bal := <-balChan + scs := bal.scStates + + if got, want := len(scs), len(tc.wantScStates); got != want { + t.Fatalf("len(subconns) = %d, want %d", got, want) + } + + for idx := range scs { + if got, want := scs[idx].state, tc.wantScStates[idx]; got != want { + t.Errorf("subconn[%d].state = %v, want = %v", idx, got, want) + } + } + + if len(tc.updatedBackendIndexes) == 0 { + return + } + + // Restart all the backends. + for i, s := range backends { + if !tc.restartConnected && i == tc.initialTargetBackendIndex { + continue + } + s.S.Stop() + if err := s.StartServer(); err != nil { + t.Fatalf("Failed to re-start test backend: %v", err) + } + } + + activeBackends = []*stubserver.StubServer{} + for _, idx := range tc.updatedBackendIndexes { + activeBackends = append(activeBackends, backends[idx]) + } + addrs = stubBackendsToResolverAddrs(activeBackends) + r.UpdateState(resolver.State{Addresses: addrs}) + + // shutdown all active backends except the target. + for idx_i, idx := range tc.updatedBackendIndexes { + if idx == tc.updatedTargetBackendIndex { + targetAddr = addrs[idx_i] + continue + } + backends[idx].S.Stop() + } + + if err := pickfirst.CheckRPCsToBackend(ctx, cc, targetAddr); err != nil { + t.Fatal(err) + } + scs = bal.scStates + + if got, want := len(scs), len(tc.wantScStatesPostUpdate); got != want { + t.Fatalf("len(subconns) = %d, want %d", got, want) + } + + for idx := range scs { + if got, want := scs[idx].state, tc.wantScStatesPostUpdate[idx]; got != want { + t.Errorf("subconn[%d].state = %v, want = %v", idx, got, want) + } + } + + }) + } +} + +// stateStoringBalancer stores the state of the subconns being created. type stateStoringBalancer struct { balancer.Balancer mu sync.Mutex scStates []*scState - ccState connectivity.State } func (b *stateStoringBalancer) Close() { @@ -110,10 +323,7 @@ func (b *stateStoringBalancer) ExitIdle() { } type stateStoringBalancerBuilder struct { -} - -func newStateStoringBalancerBuilder() *stateStoringBalancerBuilder { - return &stateStoringBalancerBuilder{} + balancerChan chan *stateStoringBalancer } func (b *stateStoringBalancerBuilder) Name() string { @@ -123,6 +333,7 @@ func (b *stateStoringBalancerBuilder) Name() string { func (b *stateStoringBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { bal := &stateStoringBalancer{} bal.Balancer = balancer.Get(pickfirstleaf.PickFirstLeafName).Build(&stateStoringCCWrapper{cc, bal}, opts) + b.balancerChan <- bal return bal } @@ -136,25 +347,12 @@ func (b *stateStoringBalancer) subConns() []scState { return ret } -func (b *stateStoringBalancer) setCCState(state connectivity.State) { - b.mu.Lock() - b.ccState = state - b.mu.Unlock() -} - func (b *stateStoringBalancer) addScState(state *scState) { b.mu.Lock() b.scStates = append(b.scStates, state) b.mu.Unlock() } -func (b *stateStoringBalancer) curCCState() connectivity.State { - b.mu.Lock() - ret := b.ccState - b.mu.Unlock() - return ret -} - type stateStoringCCWrapper struct { balancer.ClientConn b *stateStoringBalancer @@ -176,11 +374,6 @@ func (ccw *stateStoringCCWrapper) NewSubConn(addrs []resolver.Address, opts bala return ccw.ClientConn.NewSubConn(addrs, opts) } -func (ccw *stateStoringCCWrapper) UpdateState(state balancer.State) { - ccw.b.setCCState(state.ConnectivityState) - ccw.ClientConn.UpdateState(state) -} - type scState struct { state connectivity.State addrs []resolver.Address