Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide GetMinTS API to solve the compatibility issue brought by multi-timeline tso #6421

Merged
merged 10 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,20 +266,20 @@ type serviceModeKeeper struct {
// triggering service mode switching concurrently.
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient atomic.Value // *tsoClient
tsoClient *tsoClient
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
tsoSvcDiscovery ServiceDiscovery
}

func (smk *serviceModeKeeper) close() {
smk.Lock()
defer smk.Unlock()
switch smk.serviceMode {
func (k *serviceModeKeeper) close() {
k.Lock()
rleungx marked this conversation as resolved.
Show resolved Hide resolved
defer k.Unlock()
switch k.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
smk.tsoSvcDiscovery.Close()
k.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if tsoCli := smk.tsoClient.Load(); tsoCli != nil {
tsoCli.(*tsoClient).Close()
if k.tsoClient != nil {
k.tsoClient.Close()
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
}
Expand Down Expand Up @@ -505,8 +505,8 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
}
newTSOCli.Setup()
// Replace the old TSO client.
oldTSOClient := c.getTSOClient()
c.tsoClient.Store(newTSOCli)
oldTSOClient := c.tsoClient
c.tsoClient = newTSOCli
oldTSOClient.Close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
Expand All @@ -525,11 +525,10 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
if tsoCli := c.tsoClient.Load(); tsoCli != nil {
return tsoCli.(*tsoClient)
}
return nil
func (c *client) getServiceClientProxy() (*tsoClient, pdpb.ServiceMode) {
c.RLock()
defer c.RUnlock()
return c.tsoClient, c.serviceMode
}

func (c *client) scheduleUpdateTokenConnection() {
Expand Down Expand Up @@ -694,7 +693,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
tsoClient, _ := c.getServiceClientProxy()
req.start = time.Now()
req.dcLocation = dcLocation

Expand Down Expand Up @@ -723,6 +722,26 @@ func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical in
return resp.Wait()
}

func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
tsoClient, serviceMode := c.getServiceClientProxy()
if tsoClient == nil {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("tso client is nil")
}

switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode")
case pdpb.ServiceMode_PD_SVC_MODE:
// If the service mode is switched to API during GetTS() call, which happens during migration,
// returning the default timeline should be fine.
Comment on lines +735 to +736
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider local ts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GetTS has passed DCLocation to GetMinTS rpc to cover local ts. It keeps back compatibility with PD, but TSO microservice doesn't support local ts.

return c.GetTS(ctx)
case pdpb.ServiceMode_API_SVC_MODE:
return tsoClient.getMinTS(ctx)
default:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
}
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
if res.Region == nil {
return nil
Expand Down Expand Up @@ -1414,7 +1433,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
tsoClient := c.getTSOClient()
tsoClient, _ := c.getServiceClientProxy()
if tsoClient == nil {
return nil
}
Expand Down
13 changes: 7 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/testutil"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/goleak"
"google.golang.org/grpc"
)
Expand All @@ -32,13 +33,13 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
}

func TestTsLessEqual(t *testing.T) {
func TestTSLessEqual(t *testing.T) {
re := require.New(t)
re.True(tsLessEqual(9, 9, 9, 9))
re.True(tsLessEqual(8, 9, 9, 8))
re.False(tsLessEqual(9, 8, 8, 9))
re.False(tsLessEqual(9, 8, 9, 6))
re.True(tsLessEqual(9, 6, 9, 8))
re.True(tsoutil.TSLessEqual(9, 9, 9, 9))
re.True(tsoutil.TSLessEqual(8, 9, 9, 8))
re.False(tsoutil.TSLessEqual(9, 8, 8, 9))
re.False(tsoutil.TSLessEqual(9, 8, 9, 6))
re.True(tsoutil.TSLessEqual(9, 6, 9, 8))
}

func TestUpdateURLs(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
127 changes: 123 additions & 4 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,29 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// TSOClient is the client used to get timestamps.
type TSOClient interface {
// GetTS gets a timestamp from PD.
// GetTS gets a timestamp from PD or TSO microservice.
GetTS(ctx context.Context) (int64, int64, error)
// GetTSAsync gets a timestamp from PD, without block the caller.
// GetTSAsync gets a timestamp from PD or TSO microservice, without block the caller.
GetTSAsync(ctx context.Context) TSFuture
// GetLocalTS gets a local timestamp from PD.
// GetLocalTS gets a local timestamp from PD or TSO microservice.
GetLocalTS(ctx context.Context, dcLocation string) (int64, int64, error)
// GetLocalTSAsync gets a local timestamp from PD, without block the caller.
// GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller.
GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture
// GetMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from
// the TSO microservice.
GetMinTS(ctx context.Context) (int64, int64, error)
}

type tsoRequest struct {
Expand Down Expand Up @@ -275,3 +281,116 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) {
}
return nil, ""
}

// getMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from the TSO microservice.
func (c *tsoClient) getMinTS(ctx context.Context) (physical, logical int64, err error) {
// Immediately refresh the TSO server/pod list
addrs, err := c.svcDiscovery.DiscoverMicroservice(tsoService)
if err != nil {
return 0, 0, errs.ErrClientGetMinTSO.Wrap(err).GenWithStackByCause()
}
if len(addrs) == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("no tso servers/pods discovered")
}

// Get the minimal timestamp from the TSO servers/pods
var mutex sync.Mutex
resps := make([]*tsopb.GetMinTSResponse, 0)
wg := sync.WaitGroup{}
wg.Add(len(addrs))
for _, addr := range addrs {
go func(addr string) {
defer wg.Done()
resp, err := c.getMinTSFromSingleServer(ctx, addr, c.option.timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no primary in a tso server, do we need to wait to result for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will always wait all tso server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, but it does need to wait all keyspace groups in serving status. It needs to meet the following conditions.
// Check the results. The returned minimal timestamp is valid if all the conditions are met:
// 1. The number of responses is equal to the number of TSO servers/pods.
// 2. The number of keyspace groups asked is equal to the number of TSO servers/pods.
// 3. The minimal timestamp is not zero.

Copy link
Contributor Author

@binshi-bing binshi-bing May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have discussed two more options for further optimization which might be overkilled for now. The two more options we discussed are:

  1. Read all saved timestamps of all keyspace groups from etcd. Select the minimal ts then substract a timewindow. The cons is that it could cause stuck in availability when loading all saved timestamps in a transaction. If QPS is higher than frequency of lead start/restart, it could impact availability.
  2. Based on the current approach, but every keyspace group cache the last ts. If the update time of the last ts, for a keyspace group, is out of a time window, it needs to query the timestamp to pull forward the timeline. It might be overkill for now, because we won't have too many timelines from the beginning and the QPS of GetMinTS is very low (few times per day?).

if err != nil || resp == nil {
log.Warn("[tso] failed to get min ts from tso server",
zap.String("address", addr), zap.Error(err))
return
}
mutex.Lock()
defer mutex.Unlock()
resps = append(resps, resp)
}(addr)
}
wg.Wait()

// Check the results. The returned minimal timestamp is valid if all the conditions are met:
// 1. The number of responses is equal to the number of TSO servers/pods.
// 2. The number of keyspace groups asked is equal to the number of TSO servers/pods.
// 3. The minimal timestamp is not zero.
var (
minTS *pdpb.Timestamp
keyspaceGroupsAsked uint32
)
if len(resps) == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("none of tso server/pod responded")
}
emptyTS := &pdpb.Timestamp{}
keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal
for _, resp := range resps {
if resp.KeyspaceGroupsTotal == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service has no keyspace group")
}
if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs(
"the tso service has inconsistent keyspace group total count")
}
keyspaceGroupsAsked += resp.KeyspaceGroupsServing
if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 &&
(minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) {
minTS = resp.Timestamp
}
}

if keyspaceGroupsAsked != keyspaceGroupsTotal {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs(
fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d",
keyspaceGroupsAsked, keyspaceGroupsTotal))
}

if minTS == nil {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service is not ready")
}

return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil
}

func (c *tsoClient) getMinTSFromSingleServer(
ctx context.Context, tsoSrvAddr string, timeout time.Duration,
) (*tsopb.GetMinTSResponse, error) {
cc, err := c.svcDiscovery.GetOrCreateGRPCConn(tsoSrvAddr)
if err != nil {
return nil, errs.ErrClientGetMinTSO.FastGenByArgs(
fmt.Sprintf("can't connect to tso server %s", tsoSrvAddr))
}

cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

resp, err := tsopb.NewTSOClient(cc).GetMinTS(
cctx, &tsopb.GetMinTSRequest{
Header: &tsopb.RequestHeader{
ClusterId: c.svcDiscovery.GetClusterID(),
KeyspaceId: c.svcDiscovery.GetKeyspaceID(),
KeyspaceGroupId: c.svcDiscovery.GetKeyspaceGroupID(),
},
DcLocation: globalDCLocation,
})
if err != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
err, cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp == nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
"no min ts info collected", cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}
if resp.GetHeader().GetError() != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause()
}

return resp, nil
}
21 changes: 5 additions & 16 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -715,19 +716,14 @@ func (c *tsoClient) processRequests(
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := addLogical(logical, -count+1, suffixBits)
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

// Because of the suffix, we need to shift the count before we add it to the logical part.
func addLogical(logical, count int64, suffixBits uint32) int64 {
return logical + count<<suffixBits
}

func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical int64, suffixBits uint32, count int64) {
largestLogical := addLogical(firstLogical, count-1, suffixBits)
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
physical: physical,
// Save the largest logical part here
Expand All @@ -742,7 +738,7 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i
// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf("%s timestamp fallback, newly acquired ts (%d, %d) is less or equal to last one (%d, %d)",
dcLocation, physical, firstLogical, lastPhysical, lastLogical))
}
Expand All @@ -751,19 +747,12 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i
lastTSOPointer.logical = largestLogical
}

func tsLessEqual(physical, logical, thatPhysical, thatLogical int64) bool {
if physical == thatPhysical {
return logical <= thatLogical
}
return physical < thatPhysical
}

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < len(requests); i++ {
if span := opentracing.SpanFromContext(requests[i].requestCtx); span != nil {
span.Finish()
}
requests[i].physical, requests[i].logical = physical, addLogical(firstLogical, int64(i), suffixBits)
requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
requests[i].done <- err
}
}
8 changes: 5 additions & 3 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ type tsoServerDiscovery struct {
failureCount int
}

func (t *tsoServerDiscovery) countFailure() {
func (t *tsoServerDiscovery) countFailure() bool {
t.Lock()
defer t.Unlock()
t.failureCount++
return t.failureCount >= len(t.addrs)
}

func (t *tsoServerDiscovery) resetFailure() {
Expand Down Expand Up @@ -414,8 +415,9 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
c.tsoServerDiscovery.countFailure()
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
}
return err
}
c.tsoServerDiscovery.resetFailure()
Expand Down
Loading