Skip to content

Commit

Permalink
client: fix the pd client could be blocked in some cases (#3283) (#3285)
Browse files Browse the repository at this point in the history
* cherry pick #3283 to release-4.0

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* fmt

Signed-off-by: leoppro <zhaoyilin@pingcap.com>

Co-authored-by: leoppro <i@leop.pro>
Co-authored-by: Ti Prow Robot <71242396+ti-community-prow-bot@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 23, 2020
1 parent 35e2d08 commit 703471b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
24 changes: 14 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ func WithGroup(group string) ScatterRegionOption {
}

type tsoRequest struct {
start time.Time
ctx context.Context
done chan error
physical int64
logical int64
start time.Time
clientCtx context.Context
requestCtx context.Context
done chan error
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -325,7 +326,7 @@ func (c *client) tsLoop() {

func extractSpanReference(requests []*tsoRequest, opts []opentracing.StartSpanOption) []opentracing.StartSpanOption {
for _, req := range requests {
if span := opentracing.SpanFromContext(req.ctx); span != nil {
if span := opentracing.SpanFromContext(req.requestCtx); span != nil {
opts = append(opts, opentracing.ChildOf(span.Context()))
}
}
Expand Down Expand Up @@ -386,7 +387,7 @@ func tsLessEqual(physical, logical, thatPhysical, thatLogical int64) bool {

func (c *client) finishTSORequest(requests []*tsoRequest, physical, firstLogical int64, err error) {
for i := 0; i < len(requests); i++ {
if span := opentracing.SpanFromContext(requests[i].ctx); span != nil {
if span := opentracing.SpanFromContext(requests[i].requestCtx); span != nil {
span.Finish()
}
requests[i].physical, requests[i].logical = physical, firstLogical+int64(i)
Expand Down Expand Up @@ -439,8 +440,9 @@ func (c *client) GetTSAsync(ctx context.Context) TSFuture {
ctx = opentracing.ContextWithSpan(ctx, span)
}
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
req.start = time.Now()
req.ctx = ctx
req.physical = 0
req.logical = 0
c.tsoRequests <- req
Expand Down Expand Up @@ -472,8 +474,10 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
cmdDurationWait.Observe(now.Sub(start).Seconds())
cmdDurationTSO.Observe(now.Sub(req.start).Seconds())
return
case <-req.ctx.Done():
return 0, 0, errors.WithStack(req.ctx.Err())
case <-req.requestCtx.Done():
return 0, 0, errors.WithStack(req.requestCtx.Err())
case <-req.clientCtx.Done():
return 0, 0, errors.WithStack(req.clientCtx.Err())
}
}

Expand Down
31 changes: 31 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/testutil"
"go.uber.org/goleak"
Expand Down Expand Up @@ -103,3 +104,33 @@ func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) {
c.Assert(err, NotNil)
c.Assert(time.Since(start), Greater, 500*time.Millisecond)
}

var _ = Suite(&testTsoRequestSuite{})

type testTsoRequestSuite struct{}

func (s *testTsoRequestSuite) TestTsoRequestWait(c *C) {
ctx, cancel := context.WithCancel(context.Background())
req := &tsoRequest{
done: make(chan error, 1),
physical: 0,
logical: 0,
requestCtx: context.TODO(),
clientCtx: ctx,
}
cancel()
_, _, err := req.Wait()
c.Assert(errors.Cause(err), Equals, context.Canceled)

ctx, cancel = context.WithCancel(context.Background())
req = &tsoRequest{
done: make(chan error, 1),
physical: 0,
logical: 0,
requestCtx: ctx,
clientCtx: context.TODO(),
}
cancel()
_, _, err = req.Wait()
c.Assert(errors.Cause(err), Equals, context.Canceled)
}

0 comments on commit 703471b

Please sign in to comment.