Skip to content

Commit

Permalink
Extract connectionCtxsUpdater
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Apr 25, 2024
1 parent bbd1e45 commit 4553b7f
Showing 1 changed file with 52 additions and 44 deletions.
96 changes: 52 additions & 44 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,50 +336,7 @@ func (c *tsoClient) handleDispatcher(
c.wg.Done()
}()
// Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event.
go func() {
var updateTicker = &time.Ticker{}
setNewUpdateTicker := func(ticker *time.Ticker) {
if updateTicker.C != nil {
updateTicker.Stop()
}
updateTicker = ticker
}
// Set to nil before returning to ensure that the existing ticker can be GC.
defer setNewUpdateTicker(nil)

for {
c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs)
select {
case <-ctx.Done():
return
case <-c.option.enableTSOFollowerProxyCh:
// TODO: implement TSO Follower Proxy support for the Local TSO.
if dc != globalDCLocation {
continue
}
enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy()
log.Info("[tso] tso follower proxy status changed",
zap.String("dc-location", dc),
zap.Bool("enable", enableTSOFollowerProxy))
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(time.NewTicker(memberUpdateInterval))
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
setNewUpdateTicker(&time.Ticker{})
} else {
// The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered
continue
}
case <-updateTicker.C:
// Triggered periodically when the TSO Follower Proxy is enabled.
case <-c.updateTSOConnectionCtxsCh:
// Triggered by the Global TSO Allocator leader change.
}
}
}()
go c.connectionCtxsUpdater(ctx, dc, &connectionCtxs)

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
Expand Down Expand Up @@ -521,6 +478,57 @@ tsoBatchLoop:
}
}

func (c *tsoClient) connectionCtxsUpdater(
ctx context.Context,
dc string,
connectionCtxs *sync.Map,
) {
log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc))
var updateTicker = &time.Ticker{}
setNewUpdateTicker := func(ticker *time.Ticker) {
if updateTicker.C != nil {
updateTicker.Stop()
}
updateTicker = ticker
}
// Set to nil before returning to ensure that the existing ticker can be GC.
defer setNewUpdateTicker(nil)

for {
c.updateTSOConnectionCtxs(ctx, dc, connectionCtxs)
select {
case <-ctx.Done():
log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc))
return
case <-c.option.enableTSOFollowerProxyCh:
// TODO: implement TSO Follower Proxy support for the Local TSO.
if dc != globalDCLocation {
continue
}
enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy()
log.Info("[tso] tso follower proxy status changed",
zap.String("dc-location", dc),
zap.Bool("enable", enableTSOFollowerProxy))
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(time.NewTicker(memberUpdateInterval))
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
setNewUpdateTicker(&time.Ticker{})
} else {
// The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered
continue
}
case <-updateTicker.C:
// Triggered periodically when the TSO Follower Proxy is enabled.
case <-c.updateTSOConnectionCtxsCh:
// Triggered by the Global TSO Allocator leader change.
}
}
}

// chooseStream uses the reservoir sampling algorithm to randomly choose a connection.
// connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off.
func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) {
Expand Down

0 comments on commit 4553b7f

Please sign in to comment.