From b2275f669ff1c542b1cd0dc014c0217c1d8c5df7 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Sun, 21 May 2023 22:19:37 -0700 Subject: [PATCH] mcs, tso: implement GetMinTS gPRC on both API leader and PD client (#6488) ref tikv/pd#5895 implement GetMinTS gPRC on both API leader and the PD client. Signed-off-by: Bin Shi --- client/client.go | 51 +++++++-- client/go.mod | 2 +- client/go.sum | 4 +- client/tso_client.go | 116 ------------------- errors.toml | 5 + go.mod | 2 +- go.sum | 4 +- pkg/errs/errno.go | 1 + server/grpc_service.go | 191 +++++++++++++++++++++++++++++-- tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 +- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 +- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 +- tools/pd-tso-bench/go.sum | 4 +- 16 files changed, 243 insertions(+), 155 deletions(-) diff --git a/client/client.go b/client/client.go index cdd0795ed35..298896f98ac 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/tlsutil" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -633,10 +634,16 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { zap.String("new-mode", newMode.String())) } -func (c *client) getServiceClientProxy() (*tsoClient, pdpb.ServiceMode) { +func (c *client) getTSOClient() *tsoClient { c.RLock() defer c.RUnlock() - return c.tsoClient, c.serviceMode + return c.tsoClient +} + +func (c *client) getServiceMode() pdpb.ServiceMode { + c.RLock() + defer c.RUnlock() + return c.serviceMode } func (c *client) scheduleUpdateTokenConnection() { @@ -801,7 +808,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur req := tsoReqPool.Get().(*tsoRequest) req.requestCtx = ctx req.clientCtx = c.ctx - tsoClient, _ := c.getServiceClientProxy() + tsoClient := c.getTSOClient() req.start = time.Now() req.dcLocation = dcLocation @@ -831,11 +838,8 @@ func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical in } func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) { - tsoClient, serviceMode := c.getServiceClientProxy() - if tsoClient == nil { - return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("tso client is nil") - } - + // Handle compatibility issue in case of PD/API server doesn't support GetMinTS API. + serviceMode := c.getServiceMode() switch serviceMode { case pdpb.ServiceMode_UNKNOWN_SVC_MODE: return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode") @@ -844,10 +848,37 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e // returning the default timeline should be fine. return c.GetTS(ctx) case pdpb.ServiceMode_API_SVC_MODE: - return tsoClient.getMinTS(ctx) default: return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode") } + + // Call GetMinTS API to get the minimal TS from the API leader. + protoClient := c.getClient() + if protoClient == nil { + return 0, 0, errs.ErrClientGetProtoClient + } + + resp, err := protoClient.GetMinTS(ctx, &pdpb.GetMinTSRequest{ + Header: c.requestHeader(), + }) + if err != nil { + if strings.Contains(err.Error(), "Unimplemented") { + // If the method is not supported, we fallback to GetTS. + return c.GetTS(ctx) + } + return 0, 0, errs.ErrClientGetMinTSO.Wrap(err).GenWithStackByCause() + } + if resp == nil { + attachErr := errors.Errorf("error:%s", "no min ts info collected") + return 0, 0, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + if resp.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s s", resp.GetHeader().GetError().String()) + return 0, 0, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + + minTS := resp.GetTimestamp() + return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil } func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { @@ -1541,7 +1572,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map // For test only. func (c *client) GetTSOAllocators() *sync.Map { - tsoClient, _ := c.getServiceClientProxy() + tsoClient := c.getTSOClient() if tsoClient == nil { return nil } diff --git a/client/go.mod b/client/go.mod index 1bb3a49045f..5e7bef4f4a1 100644 --- a/client/go.mod +++ b/client/go.mod @@ -8,7 +8,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 + github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.8.2 diff --git a/client/go.sum b/client/go.sum index 1cfc3e28631..6416449889f 100644 --- a/client/go.sum +++ b/client/go.sum @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/tso_client.go b/client/tso_client.go index 9aae31bba5a..c326e3e7160 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -22,11 +22,8 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" - "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -281,116 +278,3 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { } return nil, "" } - -// getMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from the TSO microservice. -func (c *tsoClient) getMinTS(ctx context.Context) (physical, logical int64, err error) { - // Immediately refresh the TSO server/pod list - addrs, err := c.svcDiscovery.DiscoverMicroservice(tsoService) - if err != nil { - return 0, 0, errs.ErrClientGetMinTSO.Wrap(err).GenWithStackByCause() - } - if len(addrs) == 0 { - return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("no tso servers/pods discovered") - } - - // Get the minimal timestamp from the TSO servers/pods - var mutex sync.Mutex - resps := make([]*tsopb.GetMinTSResponse, 0) - wg := sync.WaitGroup{} - wg.Add(len(addrs)) - for _, addr := range addrs { - go func(addr string) { - defer wg.Done() - resp, err := c.getMinTSFromSingleServer(ctx, addr, c.option.timeout) - if err != nil || resp == nil { - log.Warn("[tso] failed to get min ts from tso server", - zap.String("address", addr), zap.Error(err)) - return - } - mutex.Lock() - defer mutex.Unlock() - resps = append(resps, resp) - }(addr) - } - wg.Wait() - - // Check the results. The returned minimal timestamp is valid if all the conditions are met: - // 1. The number of responses is equal to the number of TSO servers/pods. - // 2. The number of keyspace groups asked is equal to the number of TSO servers/pods. - // 3. The minimal timestamp is not zero. - var ( - minTS *pdpb.Timestamp - keyspaceGroupsAsked uint32 - ) - if len(resps) == 0 { - return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("none of tso server/pod responded") - } - emptyTS := &pdpb.Timestamp{} - keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal - for _, resp := range resps { - if resp.KeyspaceGroupsTotal == 0 { - return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service has no keyspace group") - } - if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal { - return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs( - "the tso service has inconsistent keyspace group total count") - } - keyspaceGroupsAsked += resp.KeyspaceGroupsServing - if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 && - (minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) { - minTS = resp.Timestamp - } - } - - if keyspaceGroupsAsked != keyspaceGroupsTotal { - return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs( - fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d", - keyspaceGroupsAsked, keyspaceGroupsTotal)) - } - - if minTS == nil { - return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service is not ready") - } - - return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil -} - -func (c *tsoClient) getMinTSFromSingleServer( - ctx context.Context, tsoSrvAddr string, timeout time.Duration, -) (*tsopb.GetMinTSResponse, error) { - cc, err := c.svcDiscovery.GetOrCreateGRPCConn(tsoSrvAddr) - if err != nil { - return nil, errs.ErrClientGetMinTSO.FastGenByArgs( - fmt.Sprintf("can't connect to tso server %s", tsoSrvAddr)) - } - - cctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - resp, err := tsopb.NewTSOClient(cc).GetMinTS( - cctx, &tsopb.GetMinTSRequest{ - Header: &tsopb.RequestHeader{ - ClusterId: c.svcDiscovery.GetClusterID(), - KeyspaceId: c.svcDiscovery.GetKeyspaceID(), - KeyspaceGroupId: c.svcDiscovery.GetKeyspaceGroupID(), - }, - DcLocation: globalDCLocation, - }) - if err != nil { - attachErr := errors.Errorf("error:%s target:%s status:%s", - err, cc.Target(), cc.GetState().String()) - return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() - } - if resp == nil { - attachErr := errors.Errorf("error:%s target:%s status:%s", - "no min ts info collected", cc.Target(), cc.GetState().String()) - return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() - } - if resp.GetHeader().GetError() != nil { - attachErr := errors.Errorf("error:%s target:%s status:%s", - resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) - return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() - } - - return resp, nil -} diff --git a/errors.toml b/errors.toml index 540ed5c3e13..b5d4dafdf0d 100644 --- a/errors.toml +++ b/errors.toml @@ -71,6 +71,11 @@ error = ''' get member failed ''' +["PD:client:ErrClientGetMinTSO"] +error = ''' +get min TSO failed, %v +''' + ["PD:client:ErrClientGetTSO"] error = ''' get TSO failed, %v diff --git a/go.mod b/go.mod index 2500e2eb967..1316ff144be 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 + github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 diff --git a/go.sum b/go.sum index 0866c4f1ccc..aafd65d09f2 100644 --- a/go.sum +++ b/go.sum @@ -422,8 +422,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 8e9fb83de09..d98b5e9dfd0 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -86,6 +86,7 @@ var ( ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader")) ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) + ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO")) ) // schedule errors diff --git a/server/grpc_service.go b/server/grpc_service.go index c143d7d9443..bd57c28db9c 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -21,6 +21,7 @@ import ( "path" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -52,6 +53,7 @@ const ( heartbeatSendTimeout = 5 * time.Second maxRetryTimesRequestTSOServer = 3 retryIntervalRequestTSOServer = 500 * time.Millisecond + getMinTSFromTSOServerTimeout = 1 * time.Second ) // gRPC errors @@ -100,12 +102,7 @@ func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoR // at startup and needs to get the cluster ID with the first request (i.e. GetMembers). if s.IsClosed() { return &pdpb.GetClusterInfoResponse{ - Header: &pdpb.ResponseHeader{ - Error: &pdpb.Error{ - Type: pdpb.ErrorType_UNKNOWN, - Message: errs.ErrServerNotStarted.FastGenByArgs().Error(), - }, - }, + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrServerNotStarted.FastGenByArgs().Error()), }, nil } @@ -125,18 +122,160 @@ func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoR }, nil } +// GetMinTS implements gRPC PDServer. In PD service mode, it simply returns a timestamp. +// In API service mode, it queries all tso servers and gets the minimum timestamp across +// all keyspace groups. +func (s *GrpcServer) GetMinTS( + ctx context.Context, request *pdpb.GetMinTSRequest, +) (*pdpb.GetMinTSResponse, error) { + if err := s.validateRequest(request.GetHeader()); err != nil { + return &pdpb.GetMinTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + + var ( + minTS *pdpb.Timestamp + err error + ) + if s.IsAPIServiceMode() { + minTS, err = s.GetMinTSFromTSOService(tso.GlobalDCLocation) + } else { + start := time.Now() + ts, internalErr := s.tsoAllocatorManager.HandleRequest(tso.GlobalDCLocation, 1) + if internalErr == nil { + tsoHandleDuration.Observe(time.Since(start).Seconds()) + } + minTS = &ts + } + if err != nil { + return &pdpb.GetMinTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + Timestamp: minTS, + }, nil + } + + return &pdpb.GetMinTSResponse{ + Header: s.header(), + Timestamp: minTS, + }, nil +} + +// GetMinTSFromTSOService queries all tso servers and gets the minimum timestamp across +// all keyspace groups. +func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, error) { + addrs := s.keyspaceGroupManager.GetTSOServiceAddrs() + if len(addrs) == 0 { + return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("no tso servers/pods discovered") + } + + // Get the minimal timestamp from the TSO servers/pods + var mutex sync.Mutex + resps := make([]*tsopb.GetMinTSResponse, 0) + wg := sync.WaitGroup{} + wg.Add(len(addrs)) + for _, addr := range addrs { + go func(addr string) { + defer wg.Done() + resp, err := s.getMinTSFromSingleServer(s.ctx, dcLocation, addr) + if err != nil || resp == nil { + log.Warn("failed to get min ts from tso server", + zap.String("address", addr), zap.Error(err)) + return + } + mutex.Lock() + defer mutex.Unlock() + resps = append(resps, resp) + }(addr) + } + wg.Wait() + + // Check the results. The returned minimal timestamp is valid if all the conditions are met: + // 1. The number of responses is equal to the number of TSO servers/pods. + // 2. The number of keyspace groups asked is equal to the number of TSO servers/pods. + // 3. The minimal timestamp is not zero. + var ( + minTS *pdpb.Timestamp + keyspaceGroupsAsked uint32 + ) + if len(resps) == 0 { + return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("none of tso server/pod responded") + } + emptyTS := &pdpb.Timestamp{} + keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal + for _, resp := range resps { + if resp.KeyspaceGroupsTotal == 0 { + return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("the tso service has no keyspace group") + } + if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal { + return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs( + "the tso service has inconsistent keyspace group total count") + } + keyspaceGroupsAsked += resp.KeyspaceGroupsServing + if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 && + (minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) { + minTS = resp.Timestamp + } + } + + if keyspaceGroupsAsked != keyspaceGroupsTotal { + return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs( + fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d", + keyspaceGroupsAsked, keyspaceGroupsTotal)) + } + + if minTS == nil { + return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("the tso service is not ready") + } + + return minTS, nil +} + +func (s *GrpcServer) getMinTSFromSingleServer( + ctx context.Context, dcLocation, tsoSrvAddr string, +) (*tsopb.GetMinTSResponse, error) { + cc, err := s.getDelegateClient(s.ctx, tsoSrvAddr) + if err != nil { + return nil, errs.ErrClientGetMinTSO.FastGenByArgs( + fmt.Sprintf("can't connect to tso server %s", tsoSrvAddr)) + } + + cctx, cancel := context.WithTimeout(ctx, getMinTSFromTSOServerTimeout) + defer cancel() + + resp, err := tsopb.NewTSOClient(cc).GetMinTS( + cctx, &tsopb.GetMinTSRequest{ + Header: &tsopb.RequestHeader{ + ClusterId: s.ClusterID(), + }, + DcLocation: dcLocation, + }) + if err != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + err, cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + if resp == nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + "no min ts info collected", cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + if resp.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + + return resp, nil +} + // GetMembers implements gRPC PDServer. func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) { // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID // at startup and needs to get the cluster ID with the first request (i.e. GetMembers). if s.IsClosed() { return &pdpb.GetMembersResponse{ - Header: &pdpb.ResponseHeader{ - Error: &pdpb.Error{ - Type: pdpb.ErrorType_UNKNOWN, - Message: errs.ErrServerNotStarted.FastGenByArgs().Error(), - }, - }, + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrServerNotStarted.FastGenByArgs().Error()), }, nil } members, err := cluster.GetMembers(s.GetClient()) @@ -1361,6 +1500,34 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb }, nil } +// GetGCSafePointV2 implements gRPC PDServer. +// Note: we need latest version of kvproto/master, but there was earlier commit https://github.com/pingcap/kvproto/pull/1111 +// whose server side implementation hasn't been merged, so we add this method to avoid compile error. +func (s *GrpcServer) GetGCSafePointV2(_ context.Context, _ *pdpb.GetGCSafePointV2Request) (*pdpb.GetGCSafePointV2Response, error) { + return nil, errors.New("not implemented") +} + +// WatchGCSafePointV2 implements gRPC PDServer. +// Note: we need latest version of kvproto/master, but there was earlier commit https://github.com/pingcap/kvproto/pull/1111 +// whose server side implementation hasn't been merged, so we add this method to avoid compile error. +func (s *GrpcServer) WatchGCSafePointV2(_ *pdpb.WatchGCSafePointV2Request, server pdpb.PD_WatchGCSafePointV2Server) error { + return errors.New("not implemented") +} + +// UpdateGCSafePointV2 implements gRPC PDServer. +// Note: we need latest version of kvproto/master, but there was earlier commit https://github.com/pingcap/kvproto/pull/1111 +// whose server side implementation hasn't been merged, so we add this method to avoid compile error. +func (s *GrpcServer) UpdateGCSafePointV2(_ context.Context, _ *pdpb.UpdateGCSafePointV2Request) (*pdpb.UpdateGCSafePointV2Response, error) { + return nil, errors.New("not implemented") +} + +// UpdateServiceSafePointV2 implements gRPC PDServer. +// Note: we need latest version of kvproto/master, but there was earlier commit https://github.com/pingcap/kvproto/pull/1111 +// whose server side implementation hasn't been merged, so we add this method to avoid compile error. +func (s *GrpcServer) UpdateServiceSafePointV2(_ context.Context, _ *pdpb.UpdateServiceSafePointV2Request) (*pdpb.UpdateServiceSafePointV2Response, error) { + return nil, errors.New("not implemented") +} + // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index f03741e0536..ca60c3f16d6 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 + github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 2ec551503e0..a46978cbbd9 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 7c3d0249ac9..42efe03381a 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -12,7 +12,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 + github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 6fb521ea3af..60402b113dc 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 84e4d6d3ced..aa608bfa848 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 + github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 7d7db017ba1..10dc4f1d7c7 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -383,8 +383,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index f4e31d3679a..87ccafd7314 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -851,8 +851,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= -github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc= +github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=