diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 32d2e2e6d2c..37bea8db9e5 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -720,16 +720,16 @@ func (c *tsoClient) processRequests( c.finishRequest(requests, 0, 0, 0, err) return err } + // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. + firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) curTSOInfo := &tsoInfo{ tsoServer: stream.getServerAddr(), reqKeyspaceGroupID: reqKeyspaceGroupID, respKeyspaceGroupID: respKeyspaceGroupID, respReceivedAt: time.Now(), physical: physical, - logical: logical, // `logical` is the returned largest ts's logical part + logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), } - // Get the first logical from the largest logic returned to do the subtracting before we finish each TSO request. - firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) c.finishRequest(requests, physical, firstLogical, suffixBits, nil) return nil @@ -740,12 +740,11 @@ func (c *tsoClient) compareAndSwapTS( curTSOInfo *tsoInfo, physical, firstLogical int64, ) { - prevVal, loaded := c.lastTSOInfoMap.Swap(dcLocation, curTSOInfo) + val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo) if !loaded { return } - lastTSOInfo := prevVal.(*tsoInfo) - + lastTSOInfo := val.(*tsoInfo) if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { log.Info("[tso] keyspace group changed", zap.String("dc-location", dcLocation), @@ -773,6 +772,12 @@ func (c *tsoClient) compareAndSwapTS( zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt), ) } + lastTSOInfo.tsoServer = curTSOInfo.tsoServer + lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID + lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID + lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt + lastTSOInfo.physical = curTSOInfo.physical + lastTSOInfo.logical = curTSOInfo.logical } func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {