Skip to content

Commit

Permalink
Add general tso forward/dispatcher for independent pd(tso)/tso servic…
Browse files Browse the repository at this point in the history
…es and cross cluster forwarding.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

more change

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Mar 18, 2023
1 parent a330f0b commit 2fc38e0
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 452 deletions.
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrTSOTypeConversion = errors.Normalize("type conversion error", errors.RFCCodeText("PD:tso:ErrTypeConversion"))
)

// member errors
Expand Down
197 changes: 10 additions & 187 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@ import (
"net/http"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/pkg/errors"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
"github.com/tikv/pd/pkg/utils/tsoutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -116,16 +113,20 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
streamCtx := stream.Context()
forwardedHost := grpcutil.GetForwardedHost(streamCtx)
if !s.IsLocalRequest(forwardedHost) {
clientConn, err := s.GetDelegateClient(s.ctx, forwardedHost)
if err != nil {
return err
}

if errCh == nil {
doneCh = make(chan struct{})
defer close(doneCh)
errCh = make(chan error)
}
s.dispatchTSORequest(ctx, &tsoRequest{
forwardedHost,
request,
stream,
}, forwardedHost, doneCh, errCh)

tsoProtoFactory := s.tsoProtoTSOFactory
tsoRequest := tsoutil.NewTSOProtoTSORequest(forwardedHost, clientConn, request, stream)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh)
continue
}

Expand Down Expand Up @@ -174,181 +175,3 @@ func (s *Service) errorHeader(err *tsopb.Error) *tsopb.ResponseHeader {
Error: err,
}
}

type tsoRequest struct {
forwardedHost string
request *tsopb.TsoRequest
stream tsopb.TSO_TsoServer
}

func (s *Service) dispatchTSORequest(ctx context.Context, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) {
tsoRequestChInterface, loaded := s.tsoDispatcher.LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests))
if !loaded {
tsDeadlineCh := make(chan deadline, 1)
go s.handleDispatcher(ctx, forwardedHost, tsoRequestChInterface.(chan *tsoRequest), tsDeadlineCh, doneCh, errCh)
go watchTSDeadline(ctx, tsDeadlineCh)
}
tsoRequestChInterface.(chan *tsoRequest) <- request
}

func (s *Service) handleDispatcher(ctx context.Context, forwardedHost string, tsoRequestCh <-chan *tsoRequest, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {
defer logutil.LogPanic()
dispatcherCtx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
defer s.tsoDispatcher.Delete(forwardedHost)

var (
forwardStream tsopb.TSO_TsoClient
cancel context.CancelFunc
)
client, err := s.GetDelegateClient(ctx, forwardedHost)
if err != nil {
goto errHandling
}
log.Info("create tso forward stream", zap.String("forwarded-host", forwardedHost))
forwardStream, cancel, err = s.CreateTsoForwardStream(client)
errHandling:
if err != nil || forwardStream == nil {
log.Error("create tso forwarding stream error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCCreateStream, err))
select {
case <-dispatcherCtx.Done():
return
case _, ok := <-doneCh:
if !ok {
return
}
case errCh <- err:
close(errCh)
return
}
}
defer cancel()

requests := make([]*tsoRequest, maxMergeTSORequests+1)
for {
select {
case first := <-tsoRequestCh:
pendingTSOReqCount := len(tsoRequestCh) + 1
requests[0] = first
for i := 1; i < pendingTSOReqCount; i++ {
requests[i] = <-tsoRequestCh
}
done := make(chan struct{})
dl := deadline{
timer: time.After(defaultTSOProxyTimeout),
done: done,
cancel: cancel,
}
select {
case tsDeadlineCh <- dl:
case <-dispatcherCtx.Done():
return
}
err = s.processTSORequests(forwardStream, requests[:pendingTSOReqCount])
close(done)
if err != nil {
log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err))
select {
case <-dispatcherCtx.Done():
return
case _, ok := <-doneCh:
if !ok {
return
}
case errCh <- err:
close(errCh)
return
}
}
case <-dispatcherCtx.Done():
return
}
}
}

func (s *Service) processTSORequests(forwardStream tsopb.TSO_TsoClient, requests []*tsoRequest) error {
start := time.Now()
// Merge the requests
count := uint32(0)
for _, request := range requests {
count += request.request.GetCount()
}
req := &tsopb.TsoRequest{
Header: requests[0].request.GetHeader(),
Count: count,
// TODO: support Local TSO proxy forwarding.
DcLocation: requests[0].request.GetDcLocation(),
}
// Send to the leader stream.
if err := forwardStream.Send(req); err != nil {
return err
}
resp, err := forwardStream.Recv()
if err != nil {
return err
}
tsoProxyHandleDuration.Observe(time.Since(start).Seconds())
tsoProxyBatchSize.Observe(float64(count))
// Split the response
physical, logical, suffixBits := resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits()
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
// This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10,
// count is 5, then the splitting results should be 5 and 10.
firstLogical := addLogical(logical, -int64(count), suffixBits)
return s.finishTSORequest(requests, physical, firstLogical, suffixBits)
}

// Because of the suffix, we need to shift the count before we add it to the logical part.
func addLogical(logical, count int64, suffixBits uint32) int64 {
return logical + count<<suffixBits
}

func (s *Service) finishTSORequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32) error {
countSum := int64(0)
for i := 0; i < len(requests); i++ {
count := requests[i].request.GetCount()
countSum += int64(count)
response := &tsopb.TsoResponse{
Header: s.header(),
Count: count,
Timestamp: &pdpb.Timestamp{
Physical: physical,
Logical: addLogical(firstLogical, countSum, suffixBits),
SuffixBits: suffixBits,
},
}
// Send back to the client.
if err := requests[i].stream.Send(response); err != nil {
return err
}
}
return nil
}

type deadline struct {
timer <-chan time.Time
done chan struct{}
cancel context.CancelFunc
}

func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) {
defer logutil.LogPanic()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout))
d.cancel()
case <-d.done:
continue
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
16 changes: 9 additions & 7 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/types"
Expand Down Expand Up @@ -105,8 +106,12 @@ type Server struct {
tsoAllocatorManager *tso.AllocatorManager
// Store as map[string]*grpc.ClientConn
clientConns sync.Map
// Store as map[string]chan *tsoRequest
tsoDispatcher sync.Map
// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding forwarding TSO channel.
tsoDispatcher *tsoutil.TSODispatcher
// tsoProtoTSOFactory is the abstract factory for creating tso
// related data structures defined in pd protocol
tsoProtoTSOFactory *tsoutil.TSOProtoTSOFactory

// Callback functions for different stages
// startCallbacks will be called after the server is started.
Expand Down Expand Up @@ -354,11 +359,6 @@ func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
}

// GetTSODispatcher gets the TSO Dispatcher
func (s *Server) GetTSODispatcher() *sync.Map {
return &s.tsoDispatcher
}

// IsLocalRequest checks if the forwarded host is the current host
func (s *Server) IsLocalRequest(forwardedHost string) bool {
// TODO: Check if the forwarded host is the current host.
Expand Down Expand Up @@ -578,6 +578,8 @@ func (s *Server) startServer() (err error) {
s.cfg.GetTLSConfig(), func() time.Duration { return s.cfg.MaxResetTSGap.Duration })
// Set up the Global TSO Allocator here, it will be initialized once this TSO participant campaigns leader successfully.
s.tsoAllocatorManager.SetUpAllocator(s.ctx, tso.GlobalDCLocation, s.participant.GetLeadership())
s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize)
s.tsoProtoTSOFactory = &tsoutil.TSOProtoTSOFactory{}

s.service = &Service{Server: s}

Expand Down
Loading

0 comments on commit 2fc38e0

Please sign in to comment.