Skip to content

Commit

Permalink
Remove the unnecessary RWLock in client
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Mar 21, 2023
1 parent 695d6b1 commit 254ac85
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 77 deletions.
88 changes: 41 additions & 47 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,15 @@ func WithMaxErrorRetry(count int) ClientOption {

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching
type serviceModeKeeper struct {
svcModeMutex sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient atomic.Value
tsoSvcDiscovery ServiceDiscovery
}

type client struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery ServiceDiscovery
tokenDispatcher *tokenDispatcher
serviceModeKeeper

serviceMode pdpb.ServiceMode
tsoClient atomic.Value // *tsoClient
tsoSvcDiscovery ServiceDiscovery

// For internal usage.
updateTokenConnectionCh chan struct{}
Expand Down Expand Up @@ -364,57 +359,56 @@ func (c *client) Close() {
}

func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.svcModeMutex.Lock()
defer c.svcModeMutex.Unlock()

if newMode == c.serviceMode {
return
}

log.Info("changing service mode", zap.String("old-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]),
zap.String("new-mode", pdpb.ServiceMode_name[int32(newMode)]))

if newMode == pdpb.ServiceMode_UNKNOWN_SVC_MODE {
log.Warn("intend to switch to unknown service mode. do nothing")
return
}

var newTSOCli *tsoClient
tsoSvcDiscovery := c.tsoSvcDiscovery
ctx, cancel := context.WithCancel(c.ctx)

if newMode == pdpb.ServiceMode_PD_SVC_MODE {
newTSOCli = newTSOClient(ctx, cancel, c.option, c.keyspaceID,
c.pdSvcDiscovery, c.pdSvcDiscovery.(tsoAllocatorEventSource), &pdTSOStreamBuilderFactory{})
newTSOCli.Setup()
} else {
tsoSvcDiscovery = newTSOServiceDiscovery(ctx, cancel, MetaStorageClient(c),
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
)
switch newMode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(c.ctx, MetaStorageClient(c),
c.GetClusterID(c.ctx), c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
newTSOCli = newTSOClient(ctx, cancel, c.option, c.keyspaceID,
tsoSvcDiscovery, tsoSvcDiscovery.(tsoAllocatorEventSource), &tsoTSOStreamBuilderFactory{})
if err := tsoSvcDiscovery.Init(); err != nil {
cancel()
log.Error("failed to initialize tso service discovery. keep the current service mode",
zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]), zap.Error(err))
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", c.serviceMode.String()), zap.Error(err))
return
}
newTSOCli.Setup()
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
log.Warn("[pd] intend to switch to unknown service mode, just return")
return
}
newTSOCli.Setup()

// Replace the old TSO client.
oldTSOClient := c.getTSOClient()
c.tsoClient.Store(newTSOCli)
oldTSOClient.Close()

// cleanup the old tso client
if oldTSOCli := c.getTSOClient(); oldTSOCli != nil {
oldTSOCli.Close()
oldTSOSvcDiscovery := c.tsoSvcDiscovery
// Set the new TSO service discovery if needed.
if newTSOSvcDiscovery != nil {
c.tsoSvcDiscovery = newTSOSvcDiscovery
}
if c.serviceMode == pdpb.ServiceMode_API_SVC_MODE {
// Close the old TSO service discovery safely after the old client is closed.
if oldTSOSvcDiscovery != nil {
c.tsoSvcDiscovery.Close()
}

c.tsoSvcDiscovery = tsoSvcDiscovery
c.tsoClient.Store(newTSOCli)

log.Info("service mode changed", zap.String("old-mode", pdpb.ServiceMode_name[int32(c.serviceMode)]),
zap.String("new-mode", pdpb.ServiceMode_name[int32(newMode)]))
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
Expand Down
60 changes: 34 additions & 26 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,13 @@ type pdServiceDiscovery struct {
option *option
}

// newPDServiceDiscovery returns a new baseClient.
func newPDServiceDiscovery(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup,
serviceModeUpdateCb func(pdpb.ServiceMode), urls []string, tlsCfg *tlsutil.TLSConfig, option *option) *pdServiceDiscovery {
// newPDServiceDiscovery returns a new PD service discovery-based client.
func newPDServiceDiscovery(
ctx context.Context, cancel context.CancelFunc,
wg *sync.WaitGroup,
serviceModeUpdateCb func(pdpb.ServiceMode),
urls []string, tlsCfg *tlsutil.TLSConfig, option *option,
) *pdServiceDiscovery {
pdsd := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
Expand All @@ -155,26 +159,27 @@ func newPDServiceDiscovery(ctx context.Context, cancel context.CancelFunc, wg *s
}

func (c *pdServiceDiscovery) Init() error {
if !c.isInitialized {
if err := c.initRetry(c.initClusterID); err != nil {
c.cancel()
return err
}
if err := c.initRetry(c.updateMember); err != nil {
c.cancel()
return err
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))
if c.isInitialized {
return nil
}

c.updateServiceMode()
if err := c.initRetry(c.initClusterID); err != nil {
c.cancel()
return err
}
if err := c.initRetry(c.updateMember); err != nil {
c.cancel()
return err
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.wg.Add(2)
go c.updateMemberLoop()
go c.updateServiceModeLoop()
c.updateServiceMode()

c.isInitialized = true
}
c.wg.Add(2)
go c.updateMemberLoop()
go c.updateServiceModeLoop()

c.isInitialized = true
return nil
}

Expand All @@ -198,13 +203,15 @@ func (c *pdServiceDiscovery) updateMemberLoop() {

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

for {
select {
case <-c.checkMembershipCh:
case <-time.After(memberUpdateInterval):
case <-ctx.Done():
return
case <-ticker.C:
case <-c.checkMembershipCh:
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
Expand All @@ -220,25 +227,26 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(serviceModeUpdateInterval)
defer ticker.Stop()

for {
select {
case <-time.After(serviceModeUpdateInterval):
case <-ctx.Done():
return
case <-ticker.C:
}

c.updateServiceMode()
}
}

// Close releases all resources
// Close releases all resources.
func (c *pdServiceDiscovery) Close() {
c.closeOnce.Do(func() {
log.Info("close pd service discovery")
log.Info("[pd] close pd service discovery client")
c.clientConns.Range(func(key, cc interface{}) bool {
if err := cc.(*grpc.ClientConn).Close(); err != nil {
log.Error("[pd] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
log.Error("[pd] failed to close grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
}
c.clientConns.Delete(key)
return true
Expand Down
11 changes: 9 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ type tsoClient struct {
}

// newTSOClient returns a new TSO client.
func newTSOClient(ctx context.Context, cancel context.CancelFunc, option *option, keyspaceID uint32,
svcDiscovery ServiceDiscovery, eventSrc tsoAllocatorEventSource, factory tsoStreamBuilderFactory) *tsoClient {
func newTSOClient(
ctx context.Context, option *option, keyspaceID uint32,
svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory,
) *tsoClient {
ctx, cancel := context.WithCancel(ctx)
c := &tsoClient{
ctx: ctx,
cancel: cancel,
Expand All @@ -105,6 +108,7 @@ func newTSOClient(ctx context.Context, cancel context.CancelFunc, option *option
updateTSOConnectionCtxsCh: make(chan struct{}, 1),
}

eventSrc := svcDiscovery.(tsoAllocatorEventSource)
eventSrc.SetTSOLocalServAddrsUpdatedCallback(c.updateTSOLocalServAddrs)
eventSrc.SetTSOGlobalServAddrUpdatedCallback(c.updateTSOGlobalServAddr)
c.svcDiscovery.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs)
Expand All @@ -124,6 +128,9 @@ func (c *tsoClient) Setup() {

// Close closes the TSO client
func (c *tsoClient) Close() {
if c == nil {
return
}
log.Info("closing tso client")

c.cancel()
Expand Down
7 changes: 5 additions & 2 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ type tsoServiceDiscovery struct {
}

// newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service.
func newTSOServiceDiscovery(ctx context.Context, cancel context.CancelFunc, metacli MetaStorageClient,
clusterID uint64, keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) ServiceDiscovery {
func newTSOServiceDiscovery(
ctx context.Context, metacli MetaStorageClient,
clusterID uint64, keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option,
) ServiceDiscovery {
ctx, cancel := context.WithCancel(ctx)
c := &tsoServiceDiscovery{
ctx: ctx,
cancel: cancel,
Expand Down

0 comments on commit 254ac85

Please sign in to comment.