From 0e1f37fc59b4424eaa145d56482aa6ae44f950f1 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Tue, 16 May 2023 17:45:18 +0800 Subject: [PATCH] TSO microservice discovery fallback path shouldn't call FindGroupByKeyspaceID (#6473) close tikv/pd#6472 TSO microservice discovery fallback path shouldn't call FindGroupByKeyspaceID Signed-off-by: Bin Shi --- client/tso_service_discovery.go | 85 ++++++++++++++++++--------- tests/integrations/tso/client_test.go | 37 ++++++------ 2 files changed, 78 insertions(+), 44 deletions(-) diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index ce59de84c9b..d92c64b33b3 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -17,6 +17,7 @@ package pd import ( "context" "fmt" + "reflect" "strings" "sync" "time" @@ -68,7 +69,7 @@ func (k *keyspaceGroupSvcDiscovery) update( keyspaceGroup *tsopb.KeyspaceGroup, newPrimaryAddr string, secondaryAddrs, addrs []string, -) (oldPrimaryAddr string, primarySwitched bool) { +) (oldPrimaryAddr string, primarySwitched, secondaryChanged bool) { k.Lock() defer k.Unlock() @@ -79,10 +80,13 @@ func (k *keyspaceGroupSvcDiscovery) update( k.primaryAddr = newPrimaryAddr } + if !reflect.DeepEqual(k.secondaryAddrs, secondaryAddrs) { + k.secondaryAddrs = secondaryAddrs + secondaryChanged = true + } + k.group = keyspaceGroup - k.secondaryAddrs = secondaryAddrs k.addrs = addrs - return } @@ -413,16 +417,43 @@ func (c *tsoServiceDiscovery) updateMember() error { log.Error("[tso] failed to get tso server", errs.ZapError(err)) return err } - keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout) - if err != nil { - if c.tsoServerDiscovery.countFailure() { - log.Error("[tso] failed to find the keyspace group", errs.ZapError(err)) + + var keyspaceGroup *tsopb.KeyspaceGroup + if len(tsoServerAddr) > 0 { + keyspaceGroup, err = c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout) + if err != nil { + if c.tsoServerDiscovery.countFailure() { + log.Error("[tso] failed to find the keyspace group", errs.ZapError(err)) + } + return err + } + c.tsoServerDiscovery.resetFailure() + } else { + // There is no error but no tso server address found, which means + // the server side hasn't been upgraded to the version that + // processes and returns GetClusterInfoResponse.TsoUrls. In this case, + // we fall back to the old way of discovering the tso primary addresses + // from etcd directly. + log.Warn("[tso] no tso server address found,"+ + " fallback to the legacy path to discover from etcd directly", + zap.String("discovery-key", c.defaultDiscoveryKey)) + addrs, err := c.discoverWithLegacyPath() + if err != nil { + return err + } + if len(addrs) == 0 { + return errors.New("no tso server address found") + } + members := make([]*tsopb.KeyspaceGroupMember, 0, len(addrs)) + for _, addr := range addrs { + members = append(members, &tsopb.KeyspaceGroupMember{Address: addr}) + } + members[0].IsPrimary = true + keyspaceGroup = &tsopb.KeyspaceGroup{ + Id: defaultKeySpaceGroupID, + Members: members, } - return err } - c.tsoServerDiscovery.resetFailure() - - log.Info("[tso] update keyspace group", zap.String("keyspace-group", keyspaceGroup.String())) // Initialize the serving addresses from the returned keyspace group info. primaryAddr := "" @@ -449,12 +480,17 @@ func (c *tsoServiceDiscovery) updateMember() error { } } - oldPrimary, primarySwitched := c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs) + oldPrimary, primarySwitched, secondaryChanged := + c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs) if primarySwitched { if err := c.afterPrimarySwitched(oldPrimary, primaryAddr); err != nil { return err } } + if primarySwitched || secondaryChanged { + log.Info("[tso] updated keyspace group service discovery info", + zap.String("keyspace-group-service", keyspaceGroup.String())) + } // Even if the primary address is empty, we still updated other returned info above, including the // keyspace group info and the secondary addresses. @@ -470,6 +506,12 @@ func (c *tsoServiceDiscovery) updateMember() error { func (c *tsoServiceDiscovery) findGroupByKeyspaceID( keyspaceID uint32, tsoSrvAddr string, timeout time.Duration, ) (*tsopb.KeyspaceGroup, error) { + failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func(val failpoint.Value) { + keyspaceToCheck, ok := val.(int) + if ok && keyspaceID == uint32(keyspaceToCheck) { + panic("findGroupByKeyspaceID is called unexpectedly") + } + }) ctx, cancel := context.WithTimeout(c.ctx, timeout) defer cancel() @@ -526,21 +568,10 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) }) if len(addrs) == 0 { // There is no error but no tso server address found, which means - // either the server side is experiencing some problems to get the - // tso primary addresses or the server hasn't been upgraded to the - // version which processes and returns GetClusterInfoResponse.TsoUrls. - // In this case, we fall back to the old way of discovering the tso - // primary addresses from etcd directly. - log.Warn("[tso] no tso server address found,"+ - " fallback to the legacy path to discover from etcd directly", - zap.String("discovery-key", c.defaultDiscoveryKey)) - addrs, err = c.discoverWithLegacyPath() - if err != nil { - return "", err - } - if len(addrs) == 0 { - return "", errors.New("no tso server address found") - } + // the server side hasn't been upgraded to the version that + // processes and returns GetClusterInfoResponse.TsoUrls. Return here + // and handle the fallback logic outside of this function. + return "", nil } log.Info("update tso server addresses", zap.Strings("addrs", addrs)) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index c57e3a032d1..940def85e04 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -16,6 +16,7 @@ package tso import ( "context" + "fmt" "math" "math/rand" "strings" @@ -30,6 +31,7 @@ import ( "github.com/tikv/pd/client/testutil" bs "github.com/tikv/pd/pkg/basicserver" mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -227,30 +229,31 @@ func (suite *tsoClientTestSuite) TestGetTSAsync() { func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { re := suite.Require() + keyspaceID := uint32(1000000) + // Make sure this keyspace ID is not in use somewhere. + re.False(slice.Contains(suite.keyspaceIDs, keyspaceID)) + failpointValue := fmt.Sprintf(`return(%d)`, keyspaceID) // Simulate the case that the server has lower version than the client and returns no tso addrs // in the GetClusterInfo RPC. re.NoError(failpoint.Enable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID", failpointValue)) defer func() { re.NoError(failpoint.Disable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs")) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/unexpectedCallOfFindGroupByKeyspaceID")) }() - var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber) - for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - client := mcs.SetupClientWithDefaultKeyspaceName( - suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) - var lastTS uint64 - for j := 0; j < tsoRequestRound; j++ { - physical, logical, err := client.GetTS(suite.ctx) - suite.NoError(err) - ts := tsoutil.ComposeTS(physical, logical) - suite.Less(lastTS, ts) - lastTS = ts - } - }() + + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + client := mcs.SetupClientWithKeyspaceID( + ctx, re, keyspaceID, strings.Split(suite.backendEndpoints, ",")) + var lastTS uint64 + for j := 0; j < tsoRequestRound; j++ { + physical, logical, err := client.GetTS(ctx) + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Less(lastTS, ts) + lastTS = ts } - wg.Wait() } // TestGetMinTS tests the correctness of GetMinTS.