From c95fe99e0a00cff65bbeaf13afd9946fa61c2ec6 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 24 Apr 2024 17:37:53 +0800 Subject: [PATCH 1/7] Improve the switching of TSO stream Signed-off-by: JmPotato --- client/tso_client.go | 50 ++++++----- client/tso_dispatcher.go | 102 ++++++++++++----------- tests/integrations/client/client_test.go | 35 ++++++++ 3 files changed, 120 insertions(+), 67 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 347d1f6ec0a..43d4f1dc297 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -178,6 +178,14 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe return req } +func (c *tsoClient) getTSODispatcher(dcLocation string) (*tsoDispatcher, bool) { + dispatcher, ok := c.tsoDispatcher.Load(dcLocation) + if !ok { + return nil, false + } + return dispatcher.(*tsoDispatcher), true +} + // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map func (c *tsoClient) GetTSOAllocators() *sync.Map { return &c.tsoAllocators @@ -259,6 +267,7 @@ func (c *tsoClient) updateTSOGlobalServURL(url string) error { log.Info("[tso] switch dc tso global allocator serving url", zap.String("dc-location", globalDCLocation), zap.String("new-url", url)) + c.scheduleUpdateTSOConnectionCtxs() c.scheduleCheckTSODispatcher() return nil } @@ -338,33 +347,34 @@ func (c *tsoClient) tryConnectToTSO( connectionCtxs *sync.Map, ) error { var ( - networkErrNum uint64 - err error - stream tsoStream - url string - cc *grpc.ClientConn - ) - updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) { - if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded { - // If the previous connection still exists, we should close it first. - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Store(newURL, connectionCtx) + networkErrNum uint64 + err error + stream tsoStream + url string + cc *grpc.ClientConn + updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { + // Only store the connectionCtx if it does not exist before. + connectionCtxs.LoadOrStore(newURL, connectionCtx) + // Remove all other connection contexts. + connectionCtxs.Range(func(url, cc any) bool { + if url.(string) != newURL { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(url) + } + return true + }) } - connectionCtxs.Range(func(url, cc any) bool { - if url.(string) != newURL { - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Delete(url) - } - return true - }) - } - // retry several times before falling back to the follower when the network problem happens + ) ticker := time.NewTicker(retryInterval) defer ticker.Stop() + // Retry several times before falling back to the follower when the network problem happens for i := 0; i < maxRetryTimes; i++ { c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) + if _, ok := connectionCtxs.Load(url); ok { + return nil + } if cc != nil { cctx, cancel := context.WithCancel(dispatcherCtx) stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 7528293a733..103029011b3 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -44,8 +44,17 @@ type tsoDispatcher struct { tsoBatchController *tsoBatchController } +func (td *tsoDispatcher) close() { + td.dispatcherCancel() + td.tsoBatchController.clear() +} + +func (td *tsoDispatcher) push(request *tsoRequest) { + td.tsoBatchController.tsoRequestCh <- request +} + func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { - dispatcher, ok := c.tsoDispatcher.Load(request.dcLocation) + dispatcher, ok := c.getTSODispatcher(request.dcLocation) if !ok { err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation)) log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err)) @@ -70,7 +79,7 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { failpoint.Inject("delayDispatchTSORequest", func() { time.Sleep(time.Second) }) - dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request + dispatcher.push(request) } // Check the contexts again to make sure the request is not been sent to a closed dispatcher. // Never retry on these conditions to prevent unexpected data race. @@ -89,9 +98,7 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { func (c *tsoClient) closeTSODispatcher() { c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { if dispatcherInterface != nil { - dispatcher := dispatcherInterface.(*tsoDispatcher) - dispatcher.dispatcherCancel() - dispatcher.tsoBatchController.clear() + dispatcherInterface.(*tsoDispatcher).close() } return true }) @@ -115,7 +122,7 @@ func (c *tsoClient) updateTSODispatcher() { } if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) - dispatcher.(*tsoDispatcher).dispatcherCancel() + dispatcher.(*tsoDispatcher).close() c.tsoDispatcher.Delete(dcLocation) } return true @@ -319,6 +326,7 @@ func (c *tsoClient) handleDispatcher( // url -> connectionContext connectionCtxs sync.Map ) + // Clean up the connectionCtxs when the dispatcher exits. defer func() { log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc)) // Cancel all connections. @@ -330,51 +338,51 @@ func (c *tsoClient) handleDispatcher( tbc.clear() c.wg.Done() }() - // Call updateTSOConnectionCtxs once to init the connectionCtxs first. - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - // Only the Global TSO needs to watch the updateTSOConnectionCtxsCh to sense the - // change of the cluster when TSO Follower Proxy is enabled. - // TODO: support TSO Follower Proxy for the Local TSO. - if dc == globalDCLocation { - go func() { - var updateTicker = &time.Ticker{} - setNewUpdateTicker := func(ticker *time.Ticker) { - if updateTicker.C != nil { - updateTicker.Stop() - } - updateTicker = ticker + // Daemon goroutine to update the connectionCtxs periodically and handle the TSO Follower Proxy switch event. + go func() { + var updateTicker = &time.Ticker{} + setNewUpdateTicker := func(ticker *time.Ticker) { + if updateTicker.C != nil { + updateTicker.Stop() } - // Set to nil before returning to ensure that the existing ticker can be GC. - defer setNewUpdateTicker(nil) + updateTicker = ticker + } + // Set to nil before returning to ensure that the existing ticker can be GC. + defer setNewUpdateTicker(nil) - for { - select { - case <-dispatcherCtx.Done(): - return - case <-c.option.enableTSOFollowerProxyCh: - enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() - log.Info("[tso] tso follower proxy status changed", - zap.String("dc-location", dc), - zap.Bool("enable", enableTSOFollowerProxy)) - if enableTSOFollowerProxy && updateTicker.C == nil { - // Because the TSO Follower Proxy is enabled, - // the periodic check needs to be performed. - setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) - } else if !enableTSOFollowerProxy && updateTicker.C != nil { - // Because the TSO Follower Proxy is disabled, - // the periodic check needs to be turned off. - setNewUpdateTicker(&time.Ticker{}) - } else { - // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered - continue - } - case <-updateTicker.C: - case <-c.updateTSOConnectionCtxsCh: + for { + c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + select { + case <-dispatcherCtx.Done(): + return + case <-c.option.enableTSOFollowerProxyCh: + // TODO: implement TSO Follower Proxy support for the Local TSO. + if dc != globalDCLocation { + continue } - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + log.Info("[tso] tso follower proxy status changed", + zap.String("dc-location", dc), + zap.Bool("enable", enableTSOFollowerProxy)) + if enableTSOFollowerProxy && updateTicker.C == nil { + // Because the TSO Follower Proxy is enabled, + // the periodic check needs to be performed. + setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) + } else if !enableTSOFollowerProxy && updateTicker.C != nil { + // Because the TSO Follower Proxy is disabled, + // the periodic check needs to be turned off. + setNewUpdateTicker(&time.Ticker{}) + } else { + // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered + continue + } + case <-updateTicker.C: + // Triggered periodically when the TSO Follower Proxy is enabled. + case <-c.updateTSOConnectionCtxsCh: + // Triggered by the Global TSO Allocator leader change. } - }() - } + } + }() // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(c.option.timeout) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 10be418c029..dfe7a6980c7 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -248,6 +249,40 @@ func TestLeaderTransferAndMoveCluster(t *testing.T) { wg.Wait() } +func TestGetTSAfterTransferLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 2) + re.NoError(err) + endpoints := runServer(re, cluster) + leader := cluster.WaitLeader() + re.NotEmpty(leader) + defer cluster.Destroy() + + cli := setupCli(ctx, re, endpoints, pd.WithCustomTimeoutOption(10*time.Second)) + defer cli.Close() + + var leaderSwitched atomic.Bool + cli.GetServiceDiscovery().AddServingURLSwitchedCallback(func() { + leaderSwitched.Store(true) + }) + err = cluster.GetServer(leader).ResignLeader() + re.NoError(err) + newLeader := cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NotEqual(leader, newLeader) + leader = cluster.WaitLeader() + re.NotEmpty(leader) + err = cli.GetServiceDiscovery().CheckMemberChanged() + re.NoError(err) + + testutil.Eventually(re, leaderSwitched.Load) + // The leader stream must be updated after the leader switch is sensed by the client. + _, _, err = cli.GetTS(context.TODO()) + re.NoError(err) +} + func TestTSOAllocatorLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) From 6330a88600ab74efacedebf587b2da5749ea09b6 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 24 Apr 2024 23:27:52 +0800 Subject: [PATCH 2/7] Refine the code Signed-off-by: JmPotato --- client/tso_client.go | 18 +++++++++------ client/tso_dispatcher.go | 49 +++++++++++++++++++--------------------- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 43d4f1dc297..20d1fc19583 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -342,7 +342,7 @@ func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc strin // while a new daemon will be created also to switch back to a normal leader connection ASAP the // connection comes back to normal. func (c *tsoClient) tryConnectToTSO( - dispatcherCtx context.Context, + ctx context.Context, dc string, connectionCtxs *sync.Map, ) error { @@ -376,7 +376,7 @@ func (c *tsoClient) tryConnectToTSO( return nil } if cc != nil { - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) failpoint.Inject("unreachableNetwork", func() { stream = nil @@ -402,7 +402,7 @@ func (c *tsoClient) tryConnectToTSO( networkErrNum++ } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return err case <-ticker.C: } @@ -419,14 +419,14 @@ func (c *tsoClient) tryConnectToTSO( } // create the follower stream - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) if err == nil { forwardedHostTrim := trimHTTPPrefix(forwardedHost) addr := trimHTTPPrefix(backupURL) // the goroutine is used to check the network and change back to the original stream - go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) + go c.checkAllocator(ctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) return nil @@ -439,7 +439,11 @@ func (c *tsoClient) tryConnectToTSO( // tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as // a TSO proxy to reduce the pressure of the main serving service endpoint. -func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { +func (c *tsoClient) tryConnectToTSOWithProxy( + ctx context.Context, + dc string, + connectionCtxs *sync.Map, +) error { tsoStreamBuilders := c.getAllTSOStreamBuilders() leaderAddr := c.svcDiscovery.GetServingURL() forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) @@ -465,7 +469,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s } log.Info("[tso] try to create tso stream", zap.String("dc", dc), zap.String("addr", addr)) - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) // Do not proxy the leader client. if addr != leaderAddr { log.Info("[tso] use follower to forward tso stream to do the proxy", diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 103029011b3..2e6b9babd73 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -283,20 +283,17 @@ func (c *tsoClient) checkTSODispatcher(dcLocation string) bool { func (c *tsoClient) createTSODispatcher(dcLocation string) { dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) - dispatcher := &tsoDispatcher{ - dispatcherCancel: dispatcherCancel, - tsoBatchController: newTSOBatchController( - make(chan *tsoRequest, defaultMaxTSOBatchSize*2), - defaultMaxTSOBatchSize), - } + tsoBatchController := newTSOBatchController( + make(chan *tsoRequest, defaultMaxTSOBatchSize*2), + defaultMaxTSOBatchSize, + ) failpoint.Inject("shortDispatcherChannel", func() { - dispatcher = &tsoDispatcher{ - dispatcherCancel: dispatcherCancel, - tsoBatchController: newTSOBatchController( - make(chan *tsoRequest, 1), - defaultMaxTSOBatchSize), - } + tsoBatchController = newTSOBatchController( + make(chan *tsoRequest, 1), + defaultMaxTSOBatchSize, + ) }) + dispatcher := &tsoDispatcher{dispatcherCancel, tsoBatchController} if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { // Successfully stored the value. Start the following goroutine. @@ -313,7 +310,7 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) { } func (c *tsoClient) handleDispatcher( - dispatcherCtx context.Context, + ctx context.Context, dc string, tbc *tsoBatchController, ) { @@ -351,9 +348,9 @@ func (c *tsoClient) handleDispatcher( defer setNewUpdateTicker(nil) for { - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return case <-c.option.enableTSOFollowerProxyCh: // TODO: implement TSO Follower Proxy support for the Local TSO. @@ -391,7 +388,7 @@ func (c *tsoClient) handleDispatcher( tsoBatchLoop: for { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -399,7 +396,7 @@ tsoBatchLoop: maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, // otherwise the upper caller may get blocked on waiting for the results. - if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { + if err = tbc.fetchPendingRequests(ctx, maxBatchWaitInterval); err != nil { // Finish the collected requests if the fetch failed. tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) if err == context.Canceled { @@ -435,14 +432,14 @@ tsoBatchLoop: // Check stream and retry if necessary. if stream == nil { log.Info("[tso] tso stream is not ready", zap.String("dc", dc)) - if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { + if c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) { continue streamChoosingLoop } timer := time.NewTimer(retryInterval) select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): // Finish the collected requests if the context is canceled. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) timer.Stop() return case <-streamLoopTimer.C: @@ -479,9 +476,9 @@ tsoBatchLoop: tsDeadlineCh, ok = c.tsDeadline.Load(dc) } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): // Finish the collected requests if the context is canceled. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) return case tsDeadlineCh.(chan *deadline) <- dl: } @@ -491,7 +488,7 @@ tsoBatchLoop: // If error happens during tso stream handling, reset stream and run the next trial. if err != nil { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -506,9 +503,9 @@ tsoBatchLoop: stream = nil // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. if IsLeaderChange(err) { - if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil { + if err := bo.Exec(ctx, c.svcDiscovery.CheckMemberChanged); err != nil { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -518,7 +515,7 @@ tsoBatchLoop: // will cancel the current stream, then the EOF error caused by cancel() // should not trigger the updateTSOConnectionCtxs here. // So we should only call it when the leader changes. - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) } } } From 02329ac900e67348191cc18a81b3042d896fbcf5 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 25 Apr 2024 11:24:02 +0800 Subject: [PATCH 3/7] Refine the comments Signed-off-by: JmPotato --- client/tso_client.go | 4 ++-- client/tso_dispatcher.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 20d1fc19583..550d302bcf6 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -353,9 +353,9 @@ func (c *tsoClient) tryConnectToTSO( url string cc *grpc.ClientConn updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { - // Only store the connectionCtx if it does not exist before. + // Only store the `connectionCtx` if it does not exist before. connectionCtxs.LoadOrStore(newURL, connectionCtx) - // Remove all other connection contexts. + // Remove all other `connectionCtx`s. connectionCtxs.Range(func(url, cc any) bool { if url.(string) != newURL { cc.(*tsoConnectionContext).cancel() diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 2e6b9babd73..9fc7596b236 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -122,8 +122,8 @@ func (c *tsoClient) updateTSODispatcher() { } if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) - dispatcher.(*tsoDispatcher).close() c.tsoDispatcher.Delete(dcLocation) + dispatcher.(*tsoDispatcher).close() } return true }) @@ -335,7 +335,7 @@ func (c *tsoClient) handleDispatcher( tbc.clear() c.wg.Done() }() - // Daemon goroutine to update the connectionCtxs periodically and handle the TSO Follower Proxy switch event. + // Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event. go func() { var updateTicker = &time.Ticker{} setNewUpdateTicker := func(ticker *time.Ticker) { From 13c5217ebc8db55ca603a9b3a2b5e16dc67c5905 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 25 Apr 2024 11:55:46 +0800 Subject: [PATCH 4/7] Extract connectionCtxsUpdater Signed-off-by: JmPotato --- client/tso_dispatcher.go | 97 ++++++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 9fc7596b236..d14e9222d3e 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -336,50 +336,7 @@ func (c *tsoClient) handleDispatcher( c.wg.Done() }() // Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event. - go func() { - var updateTicker = &time.Ticker{} - setNewUpdateTicker := func(ticker *time.Ticker) { - if updateTicker.C != nil { - updateTicker.Stop() - } - updateTicker = ticker - } - // Set to nil before returning to ensure that the existing ticker can be GC. - defer setNewUpdateTicker(nil) - - for { - c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) - select { - case <-ctx.Done(): - return - case <-c.option.enableTSOFollowerProxyCh: - // TODO: implement TSO Follower Proxy support for the Local TSO. - if dc != globalDCLocation { - continue - } - enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() - log.Info("[tso] tso follower proxy status changed", - zap.String("dc-location", dc), - zap.Bool("enable", enableTSOFollowerProxy)) - if enableTSOFollowerProxy && updateTicker.C == nil { - // Because the TSO Follower Proxy is enabled, - // the periodic check needs to be performed. - setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) - } else if !enableTSOFollowerProxy && updateTicker.C != nil { - // Because the TSO Follower Proxy is disabled, - // the periodic check needs to be turned off. - setNewUpdateTicker(&time.Ticker{}) - } else { - // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered - continue - } - case <-updateTicker.C: - // Triggered periodically when the TSO Follower Proxy is enabled. - case <-c.updateTSOConnectionCtxsCh: - // Triggered by the Global TSO Allocator leader change. - } - } - }() + go c.connectionCtxsUpdater(ctx, dc, &connectionCtxs) // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(c.option.timeout) @@ -521,6 +478,58 @@ tsoBatchLoop: } } +// updateTSOConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly. +func (c *tsoClient) connectionCtxsUpdater( + ctx context.Context, + dc string, + connectionCtxs *sync.Map, +) { + log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc)) + var updateTicker = &time.Ticker{} + setNewUpdateTicker := func(ticker *time.Ticker) { + if updateTicker.C != nil { + updateTicker.Stop() + } + updateTicker = ticker + } + // Set to nil before returning to ensure that the existing ticker can be GC. + defer setNewUpdateTicker(nil) + + for { + c.updateTSOConnectionCtxs(ctx, dc, connectionCtxs) + select { + case <-ctx.Done(): + log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc)) + return + case <-c.option.enableTSOFollowerProxyCh: + // TODO: implement TSO Follower Proxy support for the Local TSO. + if dc != globalDCLocation { + continue + } + enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + log.Info("[tso] tso follower proxy status changed", + zap.String("dc-location", dc), + zap.Bool("enable", enableTSOFollowerProxy)) + if enableTSOFollowerProxy && updateTicker.C == nil { + // Because the TSO Follower Proxy is enabled, + // the periodic check needs to be performed. + setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) + } else if !enableTSOFollowerProxy && updateTicker.C != nil { + // Because the TSO Follower Proxy is disabled, + // the periodic check needs to be turned off. + setNewUpdateTicker(&time.Ticker{}) + } else { + // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered + continue + } + case <-updateTicker.C: + // Triggered periodically when the TSO Follower Proxy is enabled. + case <-c.updateTSOConnectionCtxsCh: + // Triggered by the leader/follower change. + } + } +} + // chooseStream uses the reservoir sampling algorithm to randomly choose a connection. // connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { From 7c11e8ef45afab640e90adbb1ce2578d7e10289d Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 25 Apr 2024 16:39:42 +0800 Subject: [PATCH 5/7] Rename some context variables Signed-off-by: JmPotato --- client/tso_dispatcher.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index d14e9222d3e..234e7c26bae 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -222,7 +222,7 @@ func (c *tsoClient) tsoDispatcherCheckLoop() { } func (c *tsoClient) checkAllocator( - dispatcherCtx context.Context, + ctx context.Context, forwardCancel context.CancelFunc, dc, forwardedHostTrim, addr, url string, updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext)) { @@ -245,7 +245,7 @@ func (c *tsoClient) checkAllocator( healthCli = healthpb.NewHealthClient(cc) } if healthCli != nil { - healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(ctx, c.option.timeout) resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) failpoint.Inject("unreachableNetwork", func() { resp.Status = healthpb.HealthCheckResponse_UNKNOWN @@ -253,7 +253,7 @@ func (c *tsoClient) checkAllocator( healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { // create a stream of the original allocator - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) if err == nil && stream != nil { log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) @@ -263,7 +263,7 @@ func (c *tsoClient) checkAllocator( } } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return case <-ticker.C: // To ensure we can get the latest allocator leader From 583cea7a4450099f81a99e39963e5684d6030c7c Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 25 Apr 2024 17:18:44 +0800 Subject: [PATCH 6/7] Update the comment Signed-off-by: JmPotato --- client/tso_dispatcher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 234e7c26bae..5837fed8b3b 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -519,7 +519,6 @@ func (c *tsoClient) connectionCtxsUpdater( // the periodic check needs to be turned off. setNewUpdateTicker(&time.Ticker{}) } else { - // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered continue } case <-updateTicker.C: From 435b7f865d7c5e906df55166a8d2e9602e84ec22 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Sun, 28 Apr 2024 15:32:42 +0800 Subject: [PATCH 7/7] Address the comments Signed-off-by: JmPotato --- client/tso_client.go | 2 +- client/tso_dispatcher.go | 18 +++++------------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 550d302bcf6..e3bdb835901 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -180,7 +180,7 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe func (c *tsoClient) getTSODispatcher(dcLocation string) (*tsoDispatcher, bool) { dispatcher, ok := c.tsoDispatcher.Load(dcLocation) - if !ok { + if !ok || dispatcher == nil { return nil, false } return dispatcher.(*tsoDispatcher), true diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 5837fed8b3b..c82ec777eca 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -108,7 +108,7 @@ func (c *tsoClient) updateTSODispatcher() { // Set up the new TSO dispatcher and batch controller. c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { dcLocation := dcLocationKey.(string) - if !c.checkTSODispatcher(dcLocation) { + if _, ok := c.getTSODispatcher(dcLocation); !ok { c.createTSODispatcher(dcLocation) } return true @@ -273,14 +273,6 @@ func (c *tsoClient) checkAllocator( } } -func (c *tsoClient) checkTSODispatcher(dcLocation string) bool { - dispatcher, ok := c.tsoDispatcher.Load(dcLocation) - if !ok || dispatcher == nil { - return false - } - return true -} - func (c *tsoClient) createTSODispatcher(dcLocation string) { dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) tsoBatchController := newTSOBatchController( @@ -479,11 +471,15 @@ tsoBatchLoop: } // updateTSOConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly. +// TODO: implement support for the Local TSO. func (c *tsoClient) connectionCtxsUpdater( ctx context.Context, dc string, connectionCtxs *sync.Map, ) { + if dc != globalDCLocation { + return + } log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc)) var updateTicker = &time.Ticker{} setNewUpdateTicker := func(ticker *time.Ticker) { @@ -502,10 +498,6 @@ func (c *tsoClient) connectionCtxsUpdater( log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc)) return case <-c.option.enableTSOFollowerProxyCh: - // TODO: implement TSO Follower Proxy support for the Local TSO. - if dc != globalDCLocation { - continue - } enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() log.Info("[tso] tso follower proxy status changed", zap.String("dc-location", dc),