diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index f1ed6f25587f..5ae4d2e13167 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -322,14 +322,7 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // Update the copy of the config in the LB policy before releasing the lock. b.lbCfg = newCfg - - // Enqueue an event which will notify us when the above update has been - // propagated to all child policies, and the child policies have all - // processed their updates, and we have sent a picker update. - done := make(chan struct{}) - b.updateCh.Put(resumePickerUpdates{done: done}) b.stateMu.Unlock() - <-done // We cannot do cache operations above because `cacheMu` needs to be grabbed // before `stateMu` if we are to hold both locks at the same time. @@ -338,10 +331,18 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error if resizeCache { // If the new config changes reduces the size of the data cache, we // might have to evict entries to get the cache size down to the newly - // specified size. + // specified size. If we do evict an entry with valid backoff timer, + // the new picker needs to be sent to the channel to re-process any + // RPCs queued as a result of this backoff timer. b.dataCache.resize(newCfg.cacheSizeBytes) } b.cacheMu.Unlock() + // Enqueue an event which will notify us when the above update has been + // propagated to all child policies, and the child policies have all + // processed their updates, and we have sent a picker update. + done := make(chan struct{}) + b.updateCh.Put(resumePickerUpdates{done: done}) + <-done return nil } diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index 6930e3da14f5..3e69a51f9275 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -652,6 +652,179 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) { verifyRLSRequest(t, rlsReqCh, true) } +// Test that when a data cache entry is evicted due to config change +// in cache size, the picker is updated accordingly. +func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { + // Override the clientConn update hook to get notified. + clientConnUpdateDone := make(chan struct{}, 1) + origClientConnUpdateHook := clientConnUpdateHook + clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} } + defer func() { clientConnUpdateHook = origClientConnUpdateHook }() + + // Override the cache entry size func, and always return 1. + origEntrySizeFunc := computeDataCacheEntrySize + computeDataCacheEntrySize = func(cacheKey, *cacheEntry) int64 { return 1 } + defer func() { computeDataCacheEntrySize = origEntrySizeFunc }() + + // Override the backoff strategy to return a large backoff which + // will make sure the date cache entry remains in backoff for the + // duration of the test. + origBackoffStrategy := defaultBackoffStrategy + defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout} + defer func() { defaultBackoffStrategy = origBackoffStrategy }() + + // Override the minEvictionDuration to ensure that when the config update + // reduces the cache size, the resize operation is not stopped because + // we find an entry whose minExpiryDuration has not elapsed. + origMinEvictDuration := minEvictDuration + minEvictDuration = time.Duration(0) + defer func() { minEvictDuration = origMinEvictDuration }() + + // Register the top-level wrapping balancer which forwards calls to RLS. + topLevelBalancerName := t.Name() + "top-level" + var ccWrapper *testCCWrapper + stub.Register(topLevelBalancerName, stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + ccWrapper = &testCCWrapper{ClientConn: bd.ClientConn} + bd.Data = balancer.Get(Name).Build(ccWrapper, bd.BuildOptions) + }, + ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + parser := balancer.Get(Name).(balancer.ConfigParser) + return parser.ParseConfig(sc) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bal := bd.Data.(balancer.Balancer) + return bal.UpdateClientConnState(ccs) + }, + Close: func(bd *stub.BalancerData) { + bal := bd.Data.(balancer.Balancer) + bal.Close() + }, + }) + + // Start an RLS server and set the throttler to never throttle requests. + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) + overrideAdaptiveThrottler(t, neverThrottlingThrottler()) + + // Register an LB policy to act as the child policy for RLS LB policy. + childPolicyName := "test-child-policy" + t.Name() + e2e.RegisterRLSChildPolicy(childPolicyName, nil) + t.Logf("Registered child policy with name %q", childPolicyName) + + // Start a couple of test backends, and set up the fake RLS server to return + // these as targets in the RLS response, based on request keys. + // Start a couple of test backends, and set up the fake RLS server to return + // these as targets in the RLS response, based on request keys. + backendCh1, backendAddress1 := startBackend(t) + backendCh2, backendAddress2 := startBackend(t) + rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + if req.KeyMap["k1"] == "v1" { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} + } + if req.KeyMap["k2"] == "v2" { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} + } + return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")} + }) + + // Register a manual resolver and push the RLS service config through it. + r := manual.NewBuilderWithScheme("rls-e2e") + headers := ` + [ + { + "key": "k1", + "names": [ + "n1" + ] + }, + { + "key": "k2", + "names": [ + "n2" + ] + } + ] + ` + + configJSON := ` + { + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}], + "headers": %s + }], + "lookupService": "%s", + "cacheSizeBytes": %d + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] + }` + scJSON := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1000, childPolicyName) + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON) + r.InitialState(resolver.State{ServiceConfig: sc}) + + cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("create grpc.Dial() failed: %v", err) + } + defer cc.Close() + + <-clientConnUpdateDone + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Make an RPC call with empty metadata, which will eventually throw + // the error as no metadata will match from rlsServer response + // callback defined above. This will cause the control channel to + // throw the error and cause the item to get into backoff. + makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) + + ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1) + verifyRLSRequest(t, rlsReqCh, true) + + ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2") + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) + verifyRLSRequest(t, rlsReqCh, true) + + initialStateCnt := len(ccWrapper.getStates()) + // Setting the size to 1 will cause the entries to be + // evicted. + scJSON1 := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}], + "headers": %s + }], + "lookupService": "%s", + "cacheSizeBytes": 2 + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] +}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName) + sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1) + r.UpdateState(resolver.State{ServiceConfig: sc1}) + <-clientConnUpdateDone + finalStateCnt := len(ccWrapper.getStates()) + + if finalStateCnt != initialStateCnt+1 { + t.Errorf("Unexpected balancer state count: got %v, want %v", finalStateCnt, initialStateCnt) + } +} + // TestDataCachePurging verifies that the LB policy periodically evicts expired // entries from the data cache. func (s) TestDataCachePurging(t *testing.T) {