Skip to content

Commit

Permalink
rebase and renaming
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Mar 24, 2023
1 parent cbd06ca commit fd2433b
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 110 deletions.
8 changes: 4 additions & 4 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string
if err != nil {
return nil, err
}
old, ok := clientConns.LoadOrStore(addr, cc)
if !ok {
existing, loaded := clientConns.LoadOrStore(addr, cc)
if !loaded {
// Successfully stored the connection.
return cc, nil
}
cc.Close()
log.Debug("use old connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String()))
return old.(*grpc.ClientConn), nil
log.Debug("use existing connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String()))
return existing.(*grpc.ClientConn), nil
}
8 changes: 4 additions & 4 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tso
// fetchPendingRequests will start a new round of the batch collecting from the channel.
// It returns true if everything goes well, otherwise false which means we should stop the service.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration) error {
var firstTSORequest *tsoRequest
var firstRequest *tsoRequest
select {
case <-ctx.Done():
return ctx.Err()
case firstTSORequest = <-tbc.tsoRequestCh:
case firstRequest = <-tbc.tsoRequestCh:
}
// Start to batch when the first TSO request arrives.
tbc.batchStartTime = time.Now()
tbc.collectedRequestCount = 0
tbc.pushRequest(firstTSORequest)
tbc.pushRequest(firstRequest)

// This loop is for trying best to collect more requests, so we use `tbc.maxBatchSize` here.
fetchPendingRequestsLoop:
Expand Down Expand Up @@ -130,7 +130,7 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
}
}

func (tbc *tsoBatchController) revokePendingTSORequest(err error) {
func (tbc *tsoBatchController) revokePendingRequest(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.done <- err
Expand Down
2 changes: 1 addition & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *tsoClient) Close() {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
tsoErr := errors.WithStack(errClosing)
dispatcher.tsoBatchController.revokePendingTSORequest(tsoErr)
dispatcher.tsoBatchController.revokePendingRequest(tsoErr)
dispatcher.dispatcherCancel()
}
return true
Expand Down
19 changes: 10 additions & 9 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ tsoBatchLoop:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
continue tsoBatchLoop
case <-time.After(retryInterval):
continue streamChoosingLoop
Expand Down Expand Up @@ -440,7 +440,7 @@ tsoBatchLoop:
case tsDeadlineCh.(chan deadline) <- dl:
}
opts = extractSpanReference(tbc, opts[:0])
err = c.processTSORequests(stream, dc, tbc, opts)
err = c.processRequests(stream, dc, tbc, opts)
close(done)
// If error happens during tso stream handling, reset stream and run the next trial.
if err != nil {
Expand Down Expand Up @@ -691,23 +691,23 @@ func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanO
return opts
}

func (c *tsoClient) processTSORequests(stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption) error {
func (c *tsoClient) processRequests(stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption) error {
if len(opts) > 0 {
span := opentracing.StartSpan("pdclient.processTSORequests", opts...)
span := opentracing.StartSpan("pdclient.processRequests", opts...)
defer span.Finish()
}

requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(c.svcDiscovery.GetClusterID(), dcLocation, requests, tbc.batchStartTime)
if err != nil {
c.finishTSORequest(requests, 0, 0, 0, err)
c.finishRequest(requests, 0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := addLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
c.finishTSORequest(requests, physical, firstLogical, suffixBits, nil)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

Expand All @@ -729,8 +729,9 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical
// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical to compare with the new TSO's first logical.
// For example, if we have a TSO resp with logical 10, count 5, then all TSOs we get will be [6, 7, 8, 9, 10].
// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf("%s timestamp fallback, newly acquired ts (%d, %d) is less or equal to last one (%d, %d)",
dcLocation, physical, firstLogical, lastPhysical, lastLogical))
Expand All @@ -747,7 +748,7 @@ func tsLessEqual(physical, logical, thatPhysical, thatLogical int64) bool {
return physical < thatPhysical
}

func (c *tsoClient) finishTSORequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < len(requests); i++ {
if span := opentracing.SpanFromContext(requests[i].requestCtx); span != nil {
span.Finish()
Expand Down
6 changes: 4 additions & 2 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func (s *pdTSOStream) processRequests(clusterID uint64, dcLocation string, reque
return
}

physical, logical, suffixBits = resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits()
ts := resp.GetTimestamp()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}

Expand Down Expand Up @@ -189,6 +190,7 @@ func (s *tsoTSOStream) processRequests(clusterID uint64, dcLocation string, requ
return
}

physical, logical, suffixBits = resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits()
ts := resp.GetTimestamp()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}
6 changes: 3 additions & 3 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
errCh = make(chan error)
}

tsoProtoFactory := s.TSOProtoFactory
tsoRequest := tsoutil.NewTSOProtoTSORequest(forwardedHost, clientConn, request, stream)
tsoProtoFactory := s.tsoProtoFactory
tsoRequest := tsoutil.NewTSOProtoRequest(forwardedHost, clientConn, request, stream)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh)
continue
}
Expand All @@ -133,7 +133,7 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocatorManager.HandleTSORequest(request.GetDcLocation(), count)
ts, err := s.tsoAllocatorManager.HandleRequest(request.GetDcLocation(), count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ type Server struct {
tsoAllocatorManager *tso.AllocatorManager
// Store as map[string]*grpc.ClientConn
clientConns sync.Map
// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding forwarding TSO channel.
// tsoDispatcher is used to dispatch the TSO requests to
// the corresponding forwarding TSO channels.
tsoDispatcher *tsoutil.TSODispatcher
// TSOProtoFactory is the abstract factory for creating tso
// related data structures defined in pd protocol
TSOProtoFactory *tsoutil.TSOProtoFactory
// tsoProtoFactory is the abstract factory for creating tso
// related data structures defined in the tso grpc protocol
tsoProtoFactory *tsoutil.TSOProtoFactory

// Callback functions for different stages
// startCallbacks will be called after the server is started.
Expand Down Expand Up @@ -562,7 +562,7 @@ func (s *Server) startServer() (err error) {
// Set up the Global TSO Allocator here, it will be initialized once this TSO participant campaigns leader successfully.
s.tsoAllocatorManager.SetUpAllocator(s.ctx, tso.GlobalDCLocation, s.participant.GetLeadership())
s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize)
s.TSOProtoFactory = &tsoutil.TSOProtoFactory{}
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}

s.service = &Service{Server: s}

Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,8 +983,8 @@ func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) {
}
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
// HandleRequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
if dcLocation == "" {
dcLocation = GlobalDCLocation
}
Expand Down
50 changes: 30 additions & 20 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
maxMergeTSORequests = 10000
maxMergeRequests = 10000
// DefaultTSOProxyTimeout defines the default timeout value of TSP Proxying
DefaultTSOProxyTimeout = 3 * time.Second
)
Expand All @@ -37,13 +37,13 @@ type tsoResp interface {
GetTimestamp() *pdpb.Timestamp
}

// TSODispatcher is used to dispatch different TSO requests to the corresponding forwarding TSO channel.
// TSODispatcher dispatches the TSO requests to the corresponding forwarding TSO channels.
type TSODispatcher struct {
tsoProxyHandleDuration prometheus.Histogram
tsoProxyBatchSize prometheus.Histogram

// dispatchChs is used to dispatch different TSO requests to the corresponding forwarding TSO channel.
dispatchChs sync.Map // Store as map[string]chan TSORequest
// dispatchChs is used to dispatch different TSO requests to the corresponding forwarding TSO channels.
dispatchChs sync.Map // Store as map[string]chan Request
}

// NewTSODispatcher creates and returns a TSODispatcher
Expand All @@ -56,26 +56,31 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo
}

// DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host
func (s *TSODispatcher) DispatchRequest(ctx context.Context, req TSORequest, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error) {
tsoRequestChInterface, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan TSORequest, maxMergeTSORequests))
func (s *TSODispatcher) DispatchRequest(
ctx context.Context, req Request, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error) {
val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests))
reqCh := val.(chan Request)
if !loaded {
tsDeadlineCh := make(chan deadline, 1)
go s.handleDispatcher(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), tsoRequestChInterface.(chan TSORequest), tsDeadlineCh, doneCh, errCh)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh)
go watchTSDeadline(ctx, tsDeadlineCh)
}
tsoRequestChInterface.(chan TSORequest) <- req
reqCh <- req
}

func (s *TSODispatcher) handleDispatcher(ctx context.Context, tsoProtoFactory ProtoFactory, forwardedHost string, clientConn *grpc.ClientConn,
tsoRequestCh <-chan TSORequest, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {
func (s *TSODispatcher) dispatch(
ctx context.Context, tsoProtoFactory ProtoFactory, forwardedHost string, clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {
dispatcherCtx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
defer s.dispatchChs.Delete(forwardedHost)

log.Info("create tso forward stream", zap.String("forwarded-host", forwardedHost))
forwardStream, cancel, err := tsoProtoFactory.createForwardStream(ctx, clientConn)
if err != nil || forwardStream == nil {
log.Error("create tso forwarding stream error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCCreateStream, err))
log.Error("create tso forwarding stream error",
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCCreateStream, err))
select {
case <-dispatcherCtx.Done():
return
Expand All @@ -90,7 +95,7 @@ func (s *TSODispatcher) handleDispatcher(ctx context.Context, tsoProtoFactory Pr
}
defer cancel()

requests := make([]TSORequest, maxMergeTSORequests+1)
requests := make([]Request, maxMergeRequests+1)
for {
select {
case first := <-tsoRequestCh:
Expand All @@ -110,10 +115,12 @@ func (s *TSODispatcher) handleDispatcher(ctx context.Context, tsoProtoFactory Pr
case <-dispatcherCtx.Done():
return
}
err = s.processTSORequests(forwardStream, requests[:pendingTSOReqCount], tsoProtoFactory)
err = s.processRequests(forwardStream, requests[:pendingTSOReqCount], tsoProtoFactory)
close(done)
if err != nil {
log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err))
log.Error("proxy forward tso error",
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCSend, err))
select {
case <-dispatcherCtx.Done():
return
Expand All @@ -132,34 +139,36 @@ func (s *TSODispatcher) handleDispatcher(ctx context.Context, tsoProtoFactory Pr
}
}

func (s *TSODispatcher) processTSORequests(forwardStream tsoStream, requests []TSORequest, tsoProtoFactory ProtoFactory) error {
start := time.Now()
func (s *TSODispatcher) processRequests(forwardStream stream, requests []Request, tsoProtoFactory ProtoFactory) error {
// Merge the requests
count := uint32(0)
for _, request := range requests {
count += request.getCount()
}

start := time.Now()
resp, err := requests[0].process(forwardStream, count, tsoProtoFactory)
if err != nil {
return err
}
s.tsoProxyHandleDuration.Observe(time.Since(start).Seconds())
s.tsoProxyBatchSize.Observe(float64(count))
// Split the response
physical, logical, suffixBits := resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits()
ts := resp.GetTimestamp()
physical, logical, suffixBits := ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
// This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10,
// count is 5, then the splitting results should be 5 and 10.
firstLogical := addLogical(logical, -int64(count), suffixBits)
return s.finishTSORequest(requests, physical, firstLogical, suffixBits)
return s.finishRequest(requests, physical, firstLogical, suffixBits)
}

// Because of the suffix, we need to shift the count before we add it to the logical part.
func addLogical(logical, count int64, suffixBits uint32) int64 {
return logical + count<<suffixBits
}

func (s *TSODispatcher) finishTSORequest(requests []TSORequest, physical, firstLogical int64, suffixBits uint32) error {
func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical int64, suffixBits uint32) error {
countSum := int64(0)
for i := 0; i < len(requests); i++ {
newCountSum, err := requests[i].postProcess(countSum, physical, firstLogical, suffixBits)
Expand All @@ -185,7 +194,8 @@ func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout))
log.Error("tso proxy request processing is canceled due to timeout",
errs.ZapError(errs.ErrProxyTSOTimeout))
d.cancel()
case <-d.done:
continue
Expand Down
Loading

0 comments on commit fd2433b

Please sign in to comment.