Skip to content

Commit

Permalink
Add general tso forward/dispatcher for independent pd(tso)/tso servic…
Browse files Browse the repository at this point in the history
…es and cross cluster forwarding.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

more change

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Mar 20, 2023
1 parent 1e49be5 commit 86ee39e
Show file tree
Hide file tree
Showing 8 changed files with 543 additions and 486 deletions.
203 changes: 10 additions & 193 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,19 @@ import (
"net/http"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/pkg/errors"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
"github.com/tikv/pd/pkg/utils/tsoutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second
)

// gRPC errors
var (
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
Expand Down Expand Up @@ -116,16 +107,20 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
streamCtx := stream.Context()
forwardedHost := grpcutil.GetForwardedHost(streamCtx)
if !s.IsLocalRequest(forwardedHost) {
clientConn, err := s.GetDelegateClient(s.ctx, forwardedHost)
if err != nil {
return errors.WithStack(err)
}

if errCh == nil {
doneCh = make(chan struct{})
defer close(doneCh)
errCh = make(chan error)
}
s.dispatchTSORequest(ctx, &tsoRequest{
forwardedHost,
request,
stream,
}, forwardedHost, doneCh, errCh)

tsoProtoFactory := s.TSOProtoFactory
tsoRequest := tsoutil.NewTSOProtoTSORequest(forwardedHost, clientConn, request, stream)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh)
continue
}

Expand Down Expand Up @@ -174,181 +169,3 @@ func (s *Service) errorHeader(err *tsopb.Error) *tsopb.ResponseHeader {
Error: err,
}
}

type tsoRequest struct {
forwardedHost string
request *tsopb.TsoRequest
stream tsopb.TSO_TsoServer
}

func (s *Service) dispatchTSORequest(ctx context.Context, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) {
tsoRequestChInterface, loaded := s.tsoDispatcher.LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests))
if !loaded {
tsDeadlineCh := make(chan deadline, 1)
go s.handleDispatcher(ctx, forwardedHost, tsoRequestChInterface.(chan *tsoRequest), tsDeadlineCh, doneCh, errCh)
go watchTSDeadline(ctx, tsDeadlineCh)
}
tsoRequestChInterface.(chan *tsoRequest) <- request
}

func (s *Service) handleDispatcher(ctx context.Context, forwardedHost string, tsoRequestCh <-chan *tsoRequest, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {
defer logutil.LogPanic()
dispatcherCtx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
defer s.tsoDispatcher.Delete(forwardedHost)

var (
forwardStream tsopb.TSO_TsoClient
cancel context.CancelFunc
)
client, err := s.GetDelegateClient(ctx, forwardedHost)
if err != nil {
goto errHandling
}
log.Info("create tso forward stream", zap.String("forwarded-host", forwardedHost))
forwardStream, cancel, err = s.CreateTsoForwardStream(client)
errHandling:
if err != nil || forwardStream == nil {
log.Error("create tso forwarding stream error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCCreateStream, err))
select {
case <-dispatcherCtx.Done():
return
case _, ok := <-doneCh:
if !ok {
return
}
case errCh <- err:
close(errCh)
return
}
}
defer cancel()

requests := make([]*tsoRequest, maxMergeTSORequests+1)
for {
select {
case first := <-tsoRequestCh:
pendingTSOReqCount := len(tsoRequestCh) + 1
requests[0] = first
for i := 1; i < pendingTSOReqCount; i++ {
requests[i] = <-tsoRequestCh
}
done := make(chan struct{})
dl := deadline{
timer: time.After(defaultTSOProxyTimeout),
done: done,
cancel: cancel,
}
select {
case tsDeadlineCh <- dl:
case <-dispatcherCtx.Done():
return
}
err = s.processTSORequests(forwardStream, requests[:pendingTSOReqCount])
close(done)
if err != nil {
log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err))
select {
case <-dispatcherCtx.Done():
return
case _, ok := <-doneCh:
if !ok {
return
}
case errCh <- err:
close(errCh)
return
}
}
case <-dispatcherCtx.Done():
return
}
}
}

func (s *Service) processTSORequests(forwardStream tsopb.TSO_TsoClient, requests []*tsoRequest) error {
start := time.Now()
// Merge the requests
count := uint32(0)
for _, request := range requests {
count += request.request.GetCount()
}
req := &tsopb.TsoRequest{
Header: requests[0].request.GetHeader(),
Count: count,
// TODO: support Local TSO proxy forwarding.
DcLocation: requests[0].request.GetDcLocation(),
}
// Send to the leader stream.
if err := forwardStream.Send(req); err != nil {
return err
}
resp, err := forwardStream.Recv()
if err != nil {
return err
}
tsoProxyHandleDuration.Observe(time.Since(start).Seconds())
tsoProxyBatchSize.Observe(float64(count))
// Split the response
physical, logical, suffixBits := resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits()
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
// This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10,
// count is 5, then the splitting results should be 5 and 10.
firstLogical := addLogical(logical, -int64(count), suffixBits)
return s.finishTSORequest(requests, physical, firstLogical, suffixBits)
}

// Because of the suffix, we need to shift the count before we add it to the logical part.
func addLogical(logical, count int64, suffixBits uint32) int64 {
return logical + count<<suffixBits
}

func (s *Service) finishTSORequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32) error {
countSum := int64(0)
for i := 0; i < len(requests); i++ {
count := requests[i].request.GetCount()
countSum += int64(count)
response := &tsopb.TsoResponse{
Header: s.header(),
Count: count,
Timestamp: &pdpb.Timestamp{
Physical: physical,
Logical: addLogical(firstLogical, countSum, suffixBits),
SuffixBits: suffixBits,
},
}
// Send back to the client.
if err := requests[i].stream.Send(response); err != nil {
return err
}
}
return nil
}

type deadline struct {
timer <-chan time.Time
done chan struct{}
cancel context.CancelFunc
}

func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) {
defer logutil.LogPanic()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout))
d.cancel()
case <-d.done:
continue
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
38 changes: 9 additions & 29 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/types"
Expand Down Expand Up @@ -111,8 +112,12 @@ type Server struct {
tsoAllocatorManager *tso.AllocatorManager
// Store as map[string]*grpc.ClientConn
clientConns sync.Map
// Store as map[string]chan *tsoRequest
tsoDispatcher sync.Map
// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding forwarding TSO channel.
tsoDispatcher *tsoutil.TSODispatcher
// TSOProtoFactory is the abstract factory for creating tso
// related data structures defined in pd protocol
TSOProtoFactory *tsoutil.TSOProtoFactory

// Callback functions for different stages
// startCallbacks will be called after the server is started.
Expand Down Expand Up @@ -359,11 +364,6 @@ func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
}

// GetTSODispatcher gets the TSO Dispatcher
func (s *Server) GetTSODispatcher() *sync.Map {
return &s.tsoDispatcher
}

// IsLocalRequest checks if the forwarded host is the current host
func (s *Server) IsLocalRequest(forwardedHost string) bool {
// TODO: Check if the forwarded host is the current host.
Expand All @@ -373,16 +373,6 @@ func (s *Server) IsLocalRequest(forwardedHost string) bool {
return forwardedHost == ""
}

// CreateTsoForwardStream creates the forward stream
func (s *Server) CreateTsoForwardStream(client *grpc.ClientConn) (tsopb.TSO_TsoClient, context.CancelFunc, error) {
done := make(chan struct{})
ctx, cancel := context.WithCancel(s.ctx)
go checkStream(ctx, cancel, done)
forwardStream, err := tsopb.NewTSOClient(client).Tso(ctx)
done <- struct{}{}
return forwardStream, cancel, err
}

// GetDelegateClient returns grpc client connection talking to the forwarded host
func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
Expand Down Expand Up @@ -435,18 +425,6 @@ func (s *Server) SetExternalTS(externalTS uint64) error {
return nil
}

func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
defer logutil.LogPanic()
select {
case <-done:
return
case <-time.After(3 * time.Second):
cancel()
case <-streamCtx.Done():
}
<-done
}

// GetConfig gets the config.
func (s *Server) GetConfig() *Config {
return s.cfg
Expand Down Expand Up @@ -584,6 +562,8 @@ func (s *Server) startServer() (err error) {
s.cfg.GetTLSConfig(), func() time.Duration { return s.cfg.MaxResetTSGap.Duration })
// Set up the Global TSO Allocator here, it will be initialized once this TSO participant campaigns leader successfully.
s.tsoAllocatorManager.SetUpAllocator(s.ctx, tso.GlobalDCLocation, s.participant.GetLeadership())
s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize)
s.TSOProtoFactory = &tsoutil.TSOProtoFactory{}

s.service = &Service{Server: s}

Expand Down
Loading

0 comments on commit 86ee39e

Please sign in to comment.