From 4860a5bf14ca46d6bad239e9872959cbe97b0110 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Tue, 20 Jun 2023 22:22:11 -0700 Subject: [PATCH] Add keyspace group info in the timestamp fallback log in the client. (#6654) ref tikv/pd#5895 Add keyspace group info in the timestamp fallback log in the client. Signed-off-by: Bin Shi --- client/tso_dispatcher.go | 34 ++++++++++++++++++++++++++-------- client/tso_stream.go | 8 +++++--- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 30e7e670e09..c1e94f0f230 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -41,8 +41,9 @@ type tsoDispatcher struct { } type lastTSO struct { - physical int64 - logical int64 + keyspaceGroupID uint32 + physical int64 + logical int64 } const ( @@ -708,7 +709,7 @@ func (c *tsoClient) processRequests( requests := tbc.getCollectedRequests() count := int64(len(requests)) - physical, logical, suffixBits, err := stream.processRequests( + respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(), dcLocation, requests, tbc.batchStartTime) if err != nil { @@ -717,15 +718,19 @@ func (c *tsoClient) processRequests( } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) - c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count) + c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count) c.finishRequest(requests, physical, firstLogical, suffixBits, nil) return nil } -func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical int64, suffixBits uint32, count int64) { +func (c *tsoClient) compareAndSwapTS( + dcLocation string, respKeyspaceGroupID uint32, + physical, firstLogical int64, suffixBits uint32, count int64, +) { largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits) lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{ - physical: physical, + keyspaceGroupID: respKeyspaceGroupID, + physical: physical, // Save the largest logical part here logical: largestLogical, }) @@ -733,17 +738,30 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i return } lastTSOPointer := lastTSOInterface.(*lastTSO) + lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID lastPhysical := lastTSOPointer.physical lastLogical := lastTSOPointer.logical + + if lastKeyspaceGroupID != respKeyspaceGroupID { + log.Info("[tso] keyspace group changed", + zap.String("dc-location", dcLocation), + zap.Uint32("old-group-id", lastKeyspaceGroupID), + zap.Uint32("new-group-id", respKeyspaceGroupID)) + } + // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then // all TSOs we get will be [6, 7, 8, 9, 10]. if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) { panic(errors.Errorf( - "%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d", + "%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+ + "last keyspace group: %d, keyspace in request: %d, "+ + "keyspace group in request: %d, keyspace group in response: %d", dcLocation, physical, firstLogical, lastPhysical, lastLogical, - c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID())) + lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(), + c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID)) } + lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID lastTSOPointer.physical = physical // Same as above, we save the largest logical part here. lastTSOPointer.logical = largestLogical diff --git a/client/tso_stream.go b/client/tso_stream.go index 5b658279cac..aaabbb1712e 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -102,7 +102,7 @@ type tsoStream interface { processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, - ) (physical, logical int64, suffixBits uint32, err error) + ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) } type pdTSOStream struct { @@ -111,7 +111,7 @@ type pdTSOStream struct { func (s *pdTSOStream) processRequests( clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, -) (physical, logical int64, suffixBits uint32, err error) { +) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() count := int64(len(requests)) req := &pdpb.TsoRequest{ @@ -149,6 +149,7 @@ func (s *pdTSOStream) processRequests( } ts := resp.GetTimestamp() + respKeyspaceGroupID = defaultKeySpaceGroupID physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() return } @@ -160,7 +161,7 @@ type tsoTSOStream struct { func (s *tsoTSOStream) processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, -) (physical, logical int64, suffixBits uint32, err error) { +) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() count := int64(len(requests)) req := &tsopb.TsoRequest{ @@ -200,6 +201,7 @@ func (s *tsoTSOStream) processRequests( } ts := resp.GetTimestamp() + respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId() physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() return }