From 2fc38e0018af31d4774917d97db5a03d5602ba02 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 16 Mar 2023 17:46:40 -0700 Subject: [PATCH] Add general tso forward/dispatcher for independent pd(tso)/tso services and cross cluster forwarding. Signed-off-by: Bin Shi more change Signed-off-by: Bin Shi --- pkg/errs/errno.go | 1 + pkg/mcs/tso/server/grpc_service.go | 197 +--------------- pkg/mcs/tso/server/server.go | 16 +- pkg/utils/tsoutil/tso_dispatcher.go | 207 +++++++++++++++++ pkg/utils/tsoutil/tso_proto_factory.go | 109 +++++++++ pkg/utils/tsoutil/tso_request.go | 160 +++++++++++++ pkg/utils/tsoutil/{tso.go => tsoutil.go} | 0 server/grpc_service.go | 279 ++--------------------- server/server.go | 11 +- 9 files changed, 528 insertions(+), 452 deletions(-) create mode 100644 pkg/utils/tsoutil/tso_dispatcher.go create mode 100644 pkg/utils/tsoutil/tso_proto_factory.go create mode 100644 pkg/utils/tsoutil/tso_request.go rename pkg/utils/tsoutil/{tso.go => tsoutil.go} (100%) diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 155d7ef45a4b..706de8764d70 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -39,6 +39,7 @@ var ( ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp")) ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow")) ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout")) + ErrTSOTypeConversion = errors.Normalize("type conversion error", errors.RFCCodeText("PD:tso:ErrTypeConversion")) ) // member errors diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index c650c4910ad3..31f7abb50c41 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -20,17 +20,14 @@ import ( "net/http" "time" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/pkg/errors" bs "github.com/tikv/pd/pkg/basicserver" - "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/grpcutil" - "github.com/tikv/pd/pkg/utils/logutil" - "go.uber.org/zap" + "github.com/tikv/pd/pkg/utils/tsoutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -116,16 +113,20 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { streamCtx := stream.Context() forwardedHost := grpcutil.GetForwardedHost(streamCtx) if !s.IsLocalRequest(forwardedHost) { + clientConn, err := s.GetDelegateClient(s.ctx, forwardedHost) + if err != nil { + return err + } + if errCh == nil { doneCh = make(chan struct{}) defer close(doneCh) errCh = make(chan error) } - s.dispatchTSORequest(ctx, &tsoRequest{ - forwardedHost, - request, - stream, - }, forwardedHost, doneCh, errCh) + + tsoProtoFactory := s.tsoProtoTSOFactory + tsoRequest := tsoutil.NewTSOProtoTSORequest(forwardedHost, clientConn, request, stream) + s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh) continue } @@ -174,181 +175,3 @@ func (s *Service) errorHeader(err *tsopb.Error) *tsopb.ResponseHeader { Error: err, } } - -type tsoRequest struct { - forwardedHost string - request *tsopb.TsoRequest - stream tsopb.TSO_TsoServer -} - -func (s *Service) dispatchTSORequest(ctx context.Context, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) { - tsoRequestChInterface, loaded := s.tsoDispatcher.LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests)) - if !loaded { - tsDeadlineCh := make(chan deadline, 1) - go s.handleDispatcher(ctx, forwardedHost, tsoRequestChInterface.(chan *tsoRequest), tsDeadlineCh, doneCh, errCh) - go watchTSDeadline(ctx, tsDeadlineCh) - } - tsoRequestChInterface.(chan *tsoRequest) <- request -} - -func (s *Service) handleDispatcher(ctx context.Context, forwardedHost string, tsoRequestCh <-chan *tsoRequest, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) { - defer logutil.LogPanic() - dispatcherCtx, ctxCancel := context.WithCancel(ctx) - defer ctxCancel() - defer s.tsoDispatcher.Delete(forwardedHost) - - var ( - forwardStream tsopb.TSO_TsoClient - cancel context.CancelFunc - ) - client, err := s.GetDelegateClient(ctx, forwardedHost) - if err != nil { - goto errHandling - } - log.Info("create tso forward stream", zap.String("forwarded-host", forwardedHost)) - forwardStream, cancel, err = s.CreateTsoForwardStream(client) -errHandling: - if err != nil || forwardStream == nil { - log.Error("create tso forwarding stream error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCCreateStream, err)) - select { - case <-dispatcherCtx.Done(): - return - case _, ok := <-doneCh: - if !ok { - return - } - case errCh <- err: - close(errCh) - return - } - } - defer cancel() - - requests := make([]*tsoRequest, maxMergeTSORequests+1) - for { - select { - case first := <-tsoRequestCh: - pendingTSOReqCount := len(tsoRequestCh) + 1 - requests[0] = first - for i := 1; i < pendingTSOReqCount; i++ { - requests[i] = <-tsoRequestCh - } - done := make(chan struct{}) - dl := deadline{ - timer: time.After(defaultTSOProxyTimeout), - done: done, - cancel: cancel, - } - select { - case tsDeadlineCh <- dl: - case <-dispatcherCtx.Done(): - return - } - err = s.processTSORequests(forwardStream, requests[:pendingTSOReqCount]) - close(done) - if err != nil { - log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err)) - select { - case <-dispatcherCtx.Done(): - return - case _, ok := <-doneCh: - if !ok { - return - } - case errCh <- err: - close(errCh) - return - } - } - case <-dispatcherCtx.Done(): - return - } - } -} - -func (s *Service) processTSORequests(forwardStream tsopb.TSO_TsoClient, requests []*tsoRequest) error { - start := time.Now() - // Merge the requests - count := uint32(0) - for _, request := range requests { - count += request.request.GetCount() - } - req := &tsopb.TsoRequest{ - Header: requests[0].request.GetHeader(), - Count: count, - // TODO: support Local TSO proxy forwarding. - DcLocation: requests[0].request.GetDcLocation(), - } - // Send to the leader stream. - if err := forwardStream.Send(req); err != nil { - return err - } - resp, err := forwardStream.Recv() - if err != nil { - return err - } - tsoProxyHandleDuration.Observe(time.Since(start).Seconds()) - tsoProxyBatchSize.Observe(float64(count)) - // Split the response - physical, logical, suffixBits := resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits() - // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. - // This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10, - // count is 5, then the splitting results should be 5 and 10. - firstLogical := addLogical(logical, -int64(count), suffixBits) - return s.finishTSORequest(requests, physical, firstLogical, suffixBits) -} - -// Because of the suffix, we need to shift the count before we add it to the logical part. -func addLogical(logical, count int64, suffixBits uint32) int64 { - return logical + count<