Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/tso: improve the switching of TSO stream #8123

Merged
merged 7 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 41 additions & 27 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe
return req
}

func (c *tsoClient) getTSODispatcher(dcLocation string) (*tsoDispatcher, bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this function need to be put in tso_dispatcher.go?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the explain: #8121 (comment)

dispatcher, ok := c.tsoDispatcher.Load(dcLocation)
if !ok || dispatcher == nil {
return nil, false
}
return dispatcher.(*tsoDispatcher), true
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
func (c *tsoClient) GetTSOAllocators() *sync.Map {
return &c.tsoAllocators
Expand Down Expand Up @@ -259,6 +267,7 @@ func (c *tsoClient) updateTSOGlobalServURL(url string) error {
log.Info("[tso] switch dc tso global allocator serving url",
zap.String("dc-location", globalDCLocation),
zap.String("new-url", url))
c.scheduleUpdateTSOConnectionCtxs()
c.scheduleCheckTSODispatcher()
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
Expand Down Expand Up @@ -333,40 +342,41 @@ func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc strin
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *tsoClient) tryConnectToTSO(
dispatcherCtx context.Context,
ctx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
var (
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
)
updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded {
// If the previous connection still exists, we should close it first.
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Store(newURL, connectionCtx)
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
// Only store the `connectionCtx` if it does not exist before.
connectionCtxs.LoadOrStore(newURL, connectionCtx)
// Remove all other `connectionCtx`s.
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
// retry several times before falling back to the follower when the network problem happens
)

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
// Retry several times before falling back to the follower when the network problem happens
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
if _, ok := connectionCtxs.Load(url); ok {
return nil
}
if cc != nil {
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
failpoint.Inject("unreachableNetwork", func() {
stream = nil
Expand All @@ -392,7 +402,7 @@ func (c *tsoClient) tryConnectToTSO(
networkErrNum++
}
select {
case <-dispatcherCtx.Done():
case <-ctx.Done():
return err
case <-ticker.C:
}
Expand All @@ -409,14 +419,14 @@ func (c *tsoClient) tryConnectToTSO(
}

// create the follower stream
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout)
if err == nil {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addr := trimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
go c.checkAllocator(ctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel})
return nil
Expand All @@ -429,7 +439,11 @@ func (c *tsoClient) tryConnectToTSO(

// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error {
func (c *tsoClient) tryConnectToTSOWithProxy(
ctx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
Expand All @@ -455,7 +469,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
}
log.Info("[tso] try to create tso stream",
zap.String("dc", dc), zap.String("addr", addr))
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
// Do not proxy the leader client.
if addr != leaderAddr {
log.Info("[tso] use follower to forward tso stream to do the proxy",
Expand Down
Loading