diff --git a/client/tso_client.go b/client/tso_client.go index f1538a7f164..6801aee3a11 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -386,7 +386,9 @@ func (c *tsoClient) tryConnectToTSO( cc *grpc.ClientConn updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { // Only store the `connectionCtx` if it does not exist before. - connectionCtxs.LoadOrStore(newURL, connectionCtx) + if connectionCtx != nil { + connectionCtxs.LoadOrStore(newURL, connectionCtx) + } // Remove all other `connectionCtx`s. connectionCtxs.Range(func(url, cc any) bool { if url.(string) != newURL { @@ -405,6 +407,8 @@ func (c *tsoClient) tryConnectToTSO( c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) if _, ok := connectionCtxs.Load(url); ok { + // Just trigger the clean up of the stale connection contexts. + updateAndClear(url, nil) return nil } if cc != nil { diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 9f0b5f8d523..9574918a74a 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -361,7 +361,8 @@ func TestTSOFollowerProxy(t *testing.T) { defer cli1.Close() cli2 := setupCli(ctx, re, endpoints) defer cli2.Close() - cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + re.NoError(err) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber) @@ -385,6 +386,39 @@ func TestTSOFollowerProxy(t *testing.T) { }() } wg.Wait() + + // Disable the follower proxy and check if the stream is updated. + err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, false) + re.NoError(err) + + wg.Add(tsoRequestConcurrencyNumber) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + physical, logical, err := cli2.GetTS(context.Background()) + if err != nil { + // It can only be the context canceled error caused by the stale stream cleanup. + re.ErrorContains(err, "context canceled") + continue + } + re.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + // After requesting with the follower proxy, request with the leader directly. + physical, logical, err = cli1.GetTS(context.Background()) + re.NoError(err) + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + // Ensure at least one request is successful. + re.NotEmpty(lastTS) + }() + } + wg.Wait() } func TestTSOFollowerProxyWithTSOService(t *testing.T) {