From 894481553c8910f4b79c2b7be86ac3bfe0c728de Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Tue, 4 Jul 2023 18:28:02 -0700 Subject: [PATCH] Fix tso service discovery at the first time for NewClientWithAPIContext Signed-off-by: Bin Shi --- client/client.go | 32 ++++++++++++++------------------ client/pd_service_discovery.go | 18 ++++++++++++++---- client/tso_service_discovery.go | 19 ++++++++++++------- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/client/client.go b/client/client.go index 5fbc5f9eb23..006e303c1cd 100644 --- a/client/client.go +++ b/client/client.go @@ -277,14 +277,6 @@ type serviceModeKeeper struct { tsoSvcDiscovery ServiceDiscovery } -func (k *serviceModeKeeper) SetKeyspaceID(keyspaceID uint32) { - k.Lock() - defer k.Unlock() - if k.serviceMode == pdpb.ServiceMode_API_SVC_MODE { - k.tsoSvcDiscovery.SetKeyspaceID(keyspaceID) - } -} - func (k *serviceModeKeeper) close() { k.Lock() defer k.Unlock() @@ -392,7 +384,7 @@ func createClientWithKeyspace( c.pdSvcDiscovery = newPDServiceDiscovery( clientCtx, clientCancel, &c.wg, c.setServiceMode, - keyspaceID, c.svrUrls, c.tlsCfg, c.option) + nil, keyspaceID, c.svrUrls, c.tlsCfg, c.option) if err := c.setup(); err != nil { c.cancel() return nil, err @@ -504,23 +496,27 @@ func newClientWithKeyspaceName( opt(c) } + updateKeyspaceIDCb := func() error { + if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil { + return err + } + // c.keyspaceID is the source of truth for keyspace id. + c.pdSvcDiscovery.(*pdServiceDiscovery).SetKeyspaceID(c.keyspaceID) + return nil + } + // Create a PD service discovery with null keyspace id, then query the real id wth the keyspace name, // finally update the keyspace id to the PD service discovery for the following interactions. c.pdSvcDiscovery = newPDServiceDiscovery( - clientCtx, clientCancel, &c.wg, c.setServiceMode, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) + clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) if err := c.setup(); err != nil { c.cancel() return nil, err } - if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil { - return nil, err - } - // We call "c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)" after service mode already switching to API mode - // and tso service discovery already initialized, so here we need to set the tso_service_discovery's keyspace id too. - c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID) - c.serviceModeKeeper.SetKeyspaceID(c.keyspaceID) log.Info("[pd] create pd client with endpoints and keyspace", - zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), zap.Uint32("keyspace-id", c.keyspaceID)) + zap.Strings("pd-address", svrAddrs), + zap.String("keyspace-name", keyspaceName), + zap.Uint32("keyspace-id", c.keyspaceID)) return c, nil } diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index bd7ed31209a..4499c9e17c0 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -59,8 +59,6 @@ type ServiceDiscovery interface { GetClusterID() uint64 // GetKeyspaceID returns the ID of the keyspace GetKeyspaceID() uint32 - // SetKeyspaceID sets the ID of the keyspace - SetKeyspaceID(keyspaceID uint32) // GetKeyspaceGroupID returns the ID of the keyspace group GetKeyspaceGroupID() uint32 // DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls. @@ -99,6 +97,7 @@ type ServiceDiscovery interface { AddServiceAddrsSwitchedCallback(callbacks ...func()) } +type updateKeyspaceIDFunc func() error type tsoLocalServAddrsUpdatedFunc func(map[string]string) error type tsoGlobalServAddrUpdatedFunc func(string) error @@ -149,8 +148,9 @@ type pdServiceDiscovery struct { cancel context.CancelFunc closeOnce sync.Once - keyspaceID uint32 - tlsCfg *tlsutil.TLSConfig + updateKeyspaceIDCb updateKeyspaceIDFunc + keyspaceID uint32 + tlsCfg *tlsutil.TLSConfig // Client option. option *option } @@ -160,6 +160,7 @@ func newPDServiceDiscovery( ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, serviceModeUpdateCb func(pdpb.ServiceMode), + updateKeyspaceIDCb updateKeyspaceIDFunc, keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option, ) *pdServiceDiscovery { @@ -169,6 +170,7 @@ func newPDServiceDiscovery( cancel: cancel, wg: wg, serviceModeUpdateCb: serviceModeUpdateCb, + updateKeyspaceIDCb: updateKeyspaceIDCb, keyspaceID: keyspaceID, tlsCfg: tlsCfg, option: option, @@ -192,6 +194,14 @@ func (c *pdServiceDiscovery) Init() error { } log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) + // We need to update the keyspace ID before we discover and update the service mode + // so that TSO in API mode can be initialized with the correct keyspace ID. + if c.updateKeyspaceIDCb != nil { + if err := c.updateKeyspaceIDCb(); err != nil { + return err + } + } + if err := c.checkServiceModeChanged(); err != nil { log.Warn("[pd] failed to check service mode and will check later", zap.Error(err)) } diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 6763e80976a..c35809a347b 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -277,11 +277,6 @@ func (c *tsoServiceDiscovery) GetKeyspaceID() uint32 { return c.keyspaceID.Load() } -// SetKeyspaceID sets the ID of the keyspace -func (c *tsoServiceDiscovery) SetKeyspaceID(keyspaceID uint32) { - c.keyspaceID.Store(keyspaceID) -} - // GetKeyspaceGroupID returns the ID of the keyspace group. If the keyspace group is unknown, // it returns the default keyspace group ID. func (c *tsoServiceDiscovery) GetKeyspaceGroupID() uint32 { @@ -429,12 +424,16 @@ func (c *tsoServiceDiscovery) updateMember() error { return err } + keyspaceID := c.GetKeyspaceID() var keyspaceGroup *tsopb.KeyspaceGroup if len(tsoServerAddr) > 0 { - keyspaceGroup, err = c.findGroupByKeyspaceID(c.GetKeyspaceID(), tsoServerAddr, updateMemberTimeout) + keyspaceGroup, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerAddr, updateMemberTimeout) if err != nil { if c.tsoServerDiscovery.countFailure() { - log.Error("[tso] failed to find the keyspace group", errs.ZapError(err)) + log.Error("[tso] failed to find the keyspace group", + zap.Uint32("keyspace-id-in-request", keyspaceID), + zap.String("tso-server-addr", tsoServerAddr), + errs.ZapError(err)) } return err } @@ -448,6 +447,8 @@ func (c *tsoServiceDiscovery) updateMember() error { c.printFallbackLogOnce.Do(func() { log.Warn("[tso] no tso server address found,"+ " fallback to the legacy path to discover from etcd directly", + zap.Uint32("keyspace-id-in-request", keyspaceID), + zap.String("tso-server-addr", tsoServerAddr), zap.String("discovery-key", c.defaultDiscoveryKey)) }) addrs, err := c.discoverWithLegacyPath() @@ -487,6 +488,8 @@ func (c *tsoServiceDiscovery) updateMember() error { if primarySwitched := !strings.EqualFold(primaryAddr, c.getPrimaryAddr()); primarySwitched { if _, err := c.GetOrCreateGRPCConn(primaryAddr); err != nil { log.Warn("[tso] failed to connect the next primary", + zap.Uint32("keyspace-id-in-request", keyspaceID), + zap.String("tso-server-addr", tsoServerAddr), zap.String("next-primary", primaryAddr), errs.ZapError(err)) return err } @@ -497,6 +500,8 @@ func (c *tsoServiceDiscovery) updateMember() error { c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs) if primarySwitched { log.Info("[tso] updated keyspace group service discovery info", + zap.Uint32("keyspace-id-in-request", keyspaceID), + zap.String("tso-server-addr", tsoServerAddr), zap.String("keyspace-group-service", keyspaceGroup.String())) if err := c.afterPrimarySwitched(oldPrimary, primaryAddr); err != nil { return err