Skip to content

Commit d4c158d

Browse files
lysujackysp
authored andcommitted
tikv: avoid switch peer when batchRequest be cancelled (#10822) (#10850)
1 parent c8636b5 commit d4c158d

File tree

2 files changed

+22
-6
lines changed

2 files changed

+22
-6
lines changed

store/tikv/client.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,8 @@ import (
3535
"github.com/pingcap/tidb/util/logutil"
3636
"go.uber.org/zap"
3737
"google.golang.org/grpc"
38-
gcodes "google.golang.org/grpc/codes"
3938
"google.golang.org/grpc/credentials"
4039
"google.golang.org/grpc/keepalive"
41-
gstatus "google.golang.org/grpc/status"
4240
)
4341

4442
// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
@@ -618,8 +616,9 @@ func sendBatchRequest(
618616
select {
619617
case connArray.batchCommandsCh <- entry:
620618
case <-ctx1.Done():
621-
logutil.Logger(context.Background()).Warn("send request is timeout", zap.String("to", addr))
622-
return nil, errors.Trace(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout"))
619+
logutil.Logger(context.Background()).Warn("send request is cancelled",
620+
zap.String("to", addr), zap.String("cause", ctx1.Err().Error()))
621+
return nil, errors.Trace(ctx1.Err())
623622
}
624623

625624
select {
@@ -630,8 +629,9 @@ func sendBatchRequest(
630629
return tikvrpc.FromBatchCommandsResponse(res), nil
631630
case <-ctx1.Done():
632631
atomic.StoreInt32(&entry.canceled, 1)
633-
logutil.Logger(context.Background()).Warn("send request is canceled", zap.String("to", addr))
634-
return nil, errors.Trace(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout"))
632+
logutil.Logger(context.Background()).Warn("wait response is cancelled",
633+
zap.String("to", addr), zap.String("cause", ctx1.Err().Error()))
634+
return nil, errors.Trace(ctx1.Err())
635635
}
636636
}
637637

store/tikv/client_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
package tikv
1515

1616
import (
17+
"context"
1718
"testing"
19+
"time"
1820

1921
. "github.com/pingcap/check"
22+
"github.com/pingcap/errors"
2023
"github.com/pingcap/kvproto/pkg/tikvpb"
2124
"github.com/pingcap/tidb/config"
2225
)
@@ -77,3 +80,16 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) {
7780
newEntryPtr := &entries[0]
7881
c.Assert(entryPtr, Equals, newEntryPtr)
7982
}
83+
84+
func (s *testClientSuite) TestCancelTimeoutRetErr(c *C) {
85+
req := new(tikvpb.BatchCommandsRequest_Request)
86+
a := &connArray{batchCommandsCh: make(chan *batchCommandsEntry, 1)}
87+
88+
ctx, cancel := context.WithCancel(context.TODO())
89+
cancel()
90+
_, err := sendBatchRequest(ctx, "", a, req, 2*time.Second)
91+
c.Assert(errors.Cause(err), Equals, context.Canceled)
92+
93+
_, err = sendBatchRequest(context.Background(), "", a, req, 0)
94+
c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded)
95+
}

0 commit comments

Comments
 (0)