Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: handle region error from client #39838

Merged
merged 12 commits into from
Dec 13, 2022
189 changes: 152 additions & 37 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
// disable batch copr for follower read
req.StoreBatchSize = 0
}
// disable paging for batch copr
// disable batch copr when paging is enabled.
if req.Paging.Enable {
req.StoreBatchSize = 0
}
Expand Down Expand Up @@ -315,13 +315,13 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
chanSize = 18
}

tasks := make([]*copTask, 0, len(locs))
origRangeIdx := 0
taskID := uint64(0)
var store2Idx map[uint64]int
var builder taskBuilder
if req.StoreBatchSize > 0 {
store2Idx = make(map[uint64]int, 16)
builder = newBatchTaskBuilder(bo, req, cache)
} else {
builder = newLegacyTaskBuilder(len(locs))
}
origRangeIdx := 0
for _, loc := range locs {
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
Expand Down Expand Up @@ -357,7 +357,6 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
}
}
task := &copTask{
taskID: taskID,
region: loc.Location.Region,
bucketsVer: loc.getBucketVersion(),
ranges: loc.Ranges.Slice(i, nextI),
Expand All @@ -370,50 +369,138 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
requestSource: req.RequestSource,
RowCountHint: hint,
}
if req.StoreBatchSize > 0 {
batchedTask, err := cache.BuildBatchTask(bo, task, req.ReplicaRead)
if err != nil {
return nil, err
}
if idx, ok := store2Idx[batchedTask.storeID]; !ok || len(tasks[idx].batchTaskList) >= req.StoreBatchSize {
tasks = append(tasks, batchedTask.task)
store2Idx[batchedTask.storeID] = len(tasks) - 1
} else {
if tasks[idx].batchTaskList == nil {
tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, req.StoreBatchSize)
// disable paging for batched task.
tasks[idx].paging = false
tasks[idx].pagingSize = 0
}
if task.RowCountHint > 0 {
tasks[idx].RowCountHint += task.RowCountHint
}
tasks[idx].batchTaskList[taskID] = batchedTask
}
} else {
tasks = append(tasks, task)
if err = builder.handle(task); err != nil {
return nil, err
}
i = nextI
if req.Paging.Enable {
pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize)
}
taskID++
}
}

if req.Desc {
reverseTasks(tasks)
builder.reverse()
}
tasks := builder.build()
if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildCopTasks takes too much time",
zap.Duration("elapsed", elapsed),
zap.Int("range len", rangesLen),
zap.Int("task len", len(tasks)))
}
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks)))
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum()))
return tasks, nil
}

type taskBuilder interface {
handle(*copTask) error
reverse()
build() []*copTask
regionNum() int
}

type legacyTaskBuilder struct {
tasks []*copTask
}

func newLegacyTaskBuilder(hint int) *legacyTaskBuilder {
return &legacyTaskBuilder{
tasks: make([]*copTask, 0, hint),
}
}

func (b *legacyTaskBuilder) handle(task *copTask) error {
b.tasks = append(b.tasks, task)
return nil
}

func (b *legacyTaskBuilder) regionNum() int {
return len(b.tasks)
}

func (b *legacyTaskBuilder) reverse() {
reverseTasks(b.tasks)
}

func (b *legacyTaskBuilder) build() []*copTask {
return b.tasks
}

type batchStoreTaskBuilder struct {
bo *Backoffer
req *kv.Request
cache *RegionCache
taskID uint64
limit int
store2Idx map[uint64]int
tasks []*copTask
}

func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache) *batchStoreTaskBuilder {
return &batchStoreTaskBuilder{
bo: bo,
req: req,
cache: cache,
taskID: 0,
limit: req.StoreBatchSize,
store2Idx: make(map[uint64]int, 16),
tasks: make([]*copTask, 0, 16),
}
}

func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) {
b.taskID++
task.taskID = b.taskID
handled := false
defer func() {
if !handled && err == nil {
// fallback to non-batch way. It's mainly caused by region miss.
b.tasks = append(b.tasks, task)
}
}()
if b.limit <= 0 {
return nil
}
batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead)
if err != nil {
return err
}
if batchedTask == nil {
return nil
}
if idx, ok := b.store2Idx[batchedTask.storeID]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit {
b.tasks = append(b.tasks, batchedTask.task)
b.store2Idx[batchedTask.storeID] = len(b.tasks) - 1
} else {
if b.tasks[idx].batchTaskList == nil {
b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit)
// disable paging for batched task.
b.tasks[idx].paging = false
b.tasks[idx].pagingSize = 0
}
if task.RowCountHint > 0 {
b.tasks[idx].RowCountHint += task.RowCountHint
}
b.tasks[idx].batchTaskList[task.taskID] = batchedTask
}
handled = true
return nil
}

func (b *batchStoreTaskBuilder) regionNum() int {
// we allocate b.taskID for each region task, so the final b.taskID is equal to the related region number.
return int(b.taskID)
}

func (b *batchStoreTaskBuilder) reverse() {
reverseTasks(b.tasks)
}

func (b *batchStoreTaskBuilder) build() []*copTask {
return b.tasks
}

func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
servers, err := infosync.GetAllServerInfo(context.Background())
if err != nil {
Expand Down Expand Up @@ -1138,13 +1225,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if err != nil {
return remains, err
}
return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.BatchResponses, task, ch)
return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.GetBatchResponses(), task, ch)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, lockErr, task); err != nil {
return nil, err
}
return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.BatchResponses, task, ch)
return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch)
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand Down Expand Up @@ -1250,16 +1337,26 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains
}

// handle the batched cop response.
// tasks will be changed, so the input tasks should not be used after calling this function.
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) {
if len(tasks) == 0 {
return nil, nil
}
var remainTasks []*copTask
appendRemainTasks := func(tasks ...*copTask) {
if remainTasks == nil {
// allocate size fo remain length
remainTasks = make([]*copTask, 0, len(tasks))
}
remainTasks = append(remainTasks, tasks...)
}
for _, batchResp := range batchResps {
batchedTask, ok := tasks[batchResp.GetTaskId()]
taskID := batchResp.GetTaskId()
batchedTask, ok := tasks[taskID]
if !ok {
return nil, errors.Errorf("task id %d not found", batchResp.GetTaskId())
}
delete(tasks, taskID)
you06 marked this conversation as resolved.
Show resolved Hide resolved
resp := &copResponse{
pbResp: &coprocessor.Response{
Data: batchResp.Data,
Expand All @@ -1276,15 +1373,15 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp
if err != nil {
return nil, err
}
remainTasks = append(remainTasks, remains...)
appendRemainTasks(remains...)
continue
}
//TODO: handle locks in batch
if lockErr := batchResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, resp.pbResp.GetLocked(), task); err != nil {
return nil, err
}
remainTasks = append(remainTasks, task)
appendRemainTasks(task)
continue
}
if otherErr := batchResp.GetOtherError(); otherErr != "" {
Expand Down Expand Up @@ -1312,6 +1409,24 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp
// TODO: check OOM
worker.sendToRespCh(resp, ch, false)
}
for _, t := range tasks {
task := t.task
// when the error is generated by client, response is empty, skip warning for this case.
if len(batchResps) != 0 {
firstRangeStartKey := task.ranges.At(0).StartKey
lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey
logutil.Logger(bo.GetCtx()).Error("response of batched task missing",
zap.Uint64("id", task.taskID),
zap.Uint64("txnStartTS", worker.req.StartTs),
zap.Uint64("regionID", task.region.GetID()),
zap.Uint64("bucketsVer", task.bucketsVer),
zap.Int("rangeNums", task.ranges.Len()),
zap.ByteString("firstRangeStartKey", firstRangeStartKey),
zap.ByteString("lastRangeEndKey", lastRangeEndKey),
Comment on lines +1424 to +1425
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the meaning of printing the start key of the first range and the end key of the last range...?
They don't belong to the same range and there are also holes between them...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the start key and end key for one batched task(it might be in the same region), I just copy the log from here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay..

zap.String("storeAddr", task.storeAddr))
}
appendRemainTasks(t.task)
}
return remainTasks, nil
}

Expand Down
4 changes: 2 additions & 2 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -210,8 +209,9 @@ func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead k
if err != nil {
return nil, err
}
// fallback to non-batch path
if rpcContext == nil {
return nil, errors.Errorf("region %s missing", task.region.String())
return nil, nil
}
return &batchedCopTask{
task: task,
Expand Down