Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs, tso: fix expensive async forwardTSORequest() and its timeout mechanism. #6664

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo
return tsoDispatcher
}

// DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host
// DispatchRequest is the entry point for dispatching/forwarding a tso request to the destination host
func (s *TSODispatcher) DispatchRequest(
ctx context.Context,
req Request,
Expand All @@ -69,9 +69,9 @@ func (s *TSODispatcher) DispatchRequest(
val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests))
reqCh := val.(chan Request)
if !loaded {
tsDeadlineCh := make(chan deadline, 1)
tsDeadlineCh := make(chan *TSDeadline, 1)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, tsoPrimaryWatchers...)
go watchTSDeadline(ctx, tsDeadlineCh)
go WatchTSDeadline(ctx, tsDeadlineCh)
}
reqCh <- req
}
Expand All @@ -82,7 +82,7 @@ func (s *TSODispatcher) dispatch(
forwardedHost string,
clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request,
tsDeadlineCh chan<- deadline,
tsDeadlineCh chan<- *TSDeadline,
doneCh <-chan struct{},
errCh chan<- error,
tsoPrimaryWatchers ...*etcdutil.LoopWatcher) {
Expand Down Expand Up @@ -121,11 +121,7 @@ func (s *TSODispatcher) dispatch(
requests[i] = <-tsoRequestCh
}
done := make(chan struct{})
dl := deadline{
timer: time.After(DefaultTSOProxyTimeout),
done: done,
cancel: cancel,
}
dl := NewTSDeadline(DefaultTSOProxyTimeout, done, cancel)
select {
case tsDeadlineCh <- dl:
case <-dispatcherCtx.Done():
Expand Down Expand Up @@ -199,13 +195,28 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical
return nil
}

type deadline struct {
// TSDeadline is used to watch the deadline of each tso request.
type TSDeadline struct {
timer <-chan time.Time
done chan struct{}
cancel context.CancelFunc
}

func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) {
// NewTSDeadline creates a new TSDeadline.
func NewTSDeadline(
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *TSDeadline {
return &TSDeadline{
timer: time.After(timeout),
done: done,
cancel: cancel,
}
}

// WatchTSDeadline watches the deadline of each tso request.
func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) {
defer logutil.LogPanic()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
122 changes: 59 additions & 63 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,23 +406,27 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
var (
server = &tsoServer{stream: stream}
forwardStream tsopb.TSO_TsoClient
cancel context.CancelFunc
forwardCtx context.Context
cancelForward context.CancelFunc
lastForwardedHost string
)
defer func() {
s.concurrentTSOProxyStreamings.Add(-1)
// cancel the forward stream
if cancel != nil {
cancel()
if cancelForward != nil {
cancelForward()
}
}()

maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings())
if maxConcurrentTSOProxyStreamings >= 0 {
if newCount := s.concurrentTSOProxyStreamings.Add(1); newCount > maxConcurrentTSOProxyStreamings {
return errors.WithStack(ErrMaxCountTSOProxyRoutinesExceeded)
}
}

tsDeadlineCh := make(chan *tsoutil.TSDeadline, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using context.withTimeout(forwardCtx,...)

Copy link
Contributor Author

@binshi-bing binshi-bing Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use context.withTimeout() per grpc.ServerStream, because here we want to add timeout mechanism to watch every request forwarded by the forward stream instead of the entire grpc.ServerStream's deadline. The existing logic use context.withTimeout() then asynchronously call forwardTSORequest which is expensive and the latency varies in a wider range due to synchronization across multiple goroutines. This change aims to fix this problem as described in the PR description "What is changed and how does it work?" which is also pasted below:

"In order to handle the timeout case for forwardStream send/recv, the existing logic is to create
context.withTimeout(forwardCtx,...) for every request, then start a new goroutine "forwardTSORequest",
which is very expensive as shown by the profiling in #6659. This change create a watchDeadline routine per forward stream and reuse it for all the forward requests in which forwardTSORequest is called synchronously. Compared to the existing logic, the new change
is much cheaper and the latency is much stable."

go tsoutil.WatchTSDeadline(stream.Context(), tsDeadlineCh)

for {
select {
case <-s.ctx.Done():
Expand All @@ -449,22 +453,24 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
return errors.WithStack(ErrNotFoundTSOAddr)
}
if forwardStream == nil || lastForwardedHost != forwardedHost {
if cancel != nil {
cancel()
if cancelForward != nil {
cancelForward()
}

clientConn, err := s.getDelegateClient(s.ctx, forwardedHost)
if err != nil {
return errors.WithStack(err)
}
forwardStream, cancel, err = s.createTSOForwardStream(clientConn)
forwardStream, forwardCtx, cancelForward, err =
s.createTSOForwardStream(stream.Context(), clientConn)
if err != nil {
return errors.WithStack(err)
}
lastForwardedHost = forwardedHost
}

tsopbResp, err := s.forwardTSORequestWithDeadLine(stream.Context(), request, forwardStream)
tsopbResp, err := s.forwardTSORequestWithDeadLine(
forwardCtx, cancelForward, forwardStream, request, tsDeadlineCh)
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -502,37 +508,39 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
}

func (s *GrpcServer) forwardTSORequestWithDeadLine(
ctx context.Context, request *pdpb.TsoRequest, forwardStream tsopb.TSO_TsoClient,
forwardCtx context.Context,
cancelForward context.CancelFunc,
forwardStream tsopb.TSO_TsoClient,
request *pdpb.TsoRequest,
tsDeadlineCh chan<- *tsoutil.TSDeadline,
) (*tsopb.TsoResponse, error) {
defer logutil.LogPanic()
// Create a context with deadline for forwarding TSO request to TSO service.
ctxTimeout, cancel := context.WithTimeout(ctx, tsoutil.DefaultTSOProxyTimeout)
defer cancel()

tsoProxyBatchSize.Observe(float64(request.GetCount()))
done := make(chan struct{})
dl := tsoutil.NewTSDeadline(tsoutil.DefaultTSOProxyTimeout, done, cancelForward)
select {
case tsDeadlineCh <- dl:
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
case <-forwardCtx.Done():
return nil, forwardCtx.Err()
}

// used to receive the result from doSomething function
tsoRespCh := make(chan *tsopbTSOResponse, 1)
start := time.Now()
go s.forwardTSORequestAsync(ctxTimeout, request, forwardStream, tsoRespCh)
select {
case <-ctxTimeout.Done():
tsoProxyForwardTimeoutCounter.Inc()
return nil, ErrForwardTSOTimeout
case tsoResp := <-tsoRespCh:
if tsoResp.err == nil {
tsoProxyHandleDuration.Observe(time.Since(start).Seconds())
resp, err := s.forwardTSORequest(forwardCtx, request, forwardStream)
close(done)
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
}
return tsoResp.response, tsoResp.err
return nil, err
}
tsoProxyBatchSize.Observe(float64(request.GetCount()))
tsoProxyHandleDuration.Observe(time.Since(start).Seconds())
return resp, nil
}

func (s *GrpcServer) forwardTSORequestAsync(
ctxTimeout context.Context,
func (s *GrpcServer) forwardTSORequest(
ctx context.Context,
request *pdpb.TsoRequest,
forwardStream tsopb.TSO_TsoClient,
tsoRespCh chan<- *tsopbTSOResponse,
) {
) (*tsopb.TsoResponse, error) {
tsopbReq := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: request.GetHeader().GetClusterId(),
Expand All @@ -545,46 +553,32 @@ func (s *GrpcServer) forwardTSORequestAsync(
}

failpoint.Inject("tsoProxySendToTSOTimeout", func() {
<-ctxTimeout.Done()
failpoint.Return()
// block until watchDeadline routine cancels the context.
<-ctx.Done()
})

if err := forwardStream.Send(tsopbReq); err != nil {
select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{err: err}:
}
return
}

select {
case <-ctxTimeout.Done():
return
case <-ctx.Done():
return nil, ctx.Err()
default:
}

if err := forwardStream.Send(tsopbReq); err != nil {
return nil, err
}

failpoint.Inject("tsoProxyRecvFromTSOTimeout", func() {
<-ctxTimeout.Done()
failpoint.Return()
// block until watchDeadline routine cancels the context.
<-ctx.Done()
})

response, err := forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
}
}
select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{response: response, err: err}:
case <-ctx.Done():
return nil, ctx.Err()
default:
}
}

type tsopbTSOResponse struct {
response *tsopb.TsoResponse
err error
return forwardStream.Recv()
}

// tsoServer wraps PD_TsoServer to ensure when any error
Expand Down Expand Up @@ -2140,13 +2134,15 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC
}
}

func (s *GrpcServer) createTSOForwardStream(client *grpc.ClientConn) (tsopb.TSO_TsoClient, context.CancelFunc, error) {
func (s *GrpcServer) createTSOForwardStream(
ctx context.Context, client *grpc.ClientConn,
) (tsopb.TSO_TsoClient, context.Context, context.CancelFunc, error) {
done := make(chan struct{})
ctx, cancel := context.WithCancel(s.ctx)
go checkStream(ctx, cancel, done)
forwardStream, err := tsopb.NewTSOClient(client).Tso(ctx)
forwardCtx, cancelForward := context.WithCancel(ctx)
go checkStream(forwardCtx, cancelForward, done)
forwardStream, err := tsopb.NewTSOClient(client).Tso(forwardCtx)
done <- struct{}{}
return forwardStream, cancel, err
return forwardStream, forwardCtx, cancelForward, err
}

func (s *GrpcServer) createReportBucketsForwardStream(client *grpc.ClientConn) (pdpb.PD_ReportBucketsClient, context.CancelFunc, error) {
Expand Down