Skip to content

Commit

Permalink
store/copr: handle region error from client (#39838)
Browse files Browse the repository at this point in the history
ref #39361
  • Loading branch information
you06 authored Dec 13, 2022
1 parent 90a7398 commit d0d6955
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 39 deletions.
189 changes: 152 additions & 37 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
// disable batch copr for follower read
req.StoreBatchSize = 0
}
// disable paging for batch copr
// disable batch copr when paging is enabled.
if req.Paging.Enable {
req.StoreBatchSize = 0
}
Expand Down Expand Up @@ -315,13 +315,13 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
chanSize = 18
}

tasks := make([]*copTask, 0, len(locs))
origRangeIdx := 0
taskID := uint64(0)
var store2Idx map[uint64]int
var builder taskBuilder
if req.StoreBatchSize > 0 {
store2Idx = make(map[uint64]int, 16)
builder = newBatchTaskBuilder(bo, req, cache)
} else {
builder = newLegacyTaskBuilder(len(locs))
}
origRangeIdx := 0
for _, loc := range locs {
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
Expand Down Expand Up @@ -357,7 +357,6 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
}
}
task := &copTask{
taskID: taskID,
region: loc.Location.Region,
bucketsVer: loc.getBucketVersion(),
ranges: loc.Ranges.Slice(i, nextI),
Expand All @@ -370,50 +369,138 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
requestSource: req.RequestSource,
RowCountHint: hint,
}
if req.StoreBatchSize > 0 {
batchedTask, err := cache.BuildBatchTask(bo, task, req.ReplicaRead)
if err != nil {
return nil, err
}
if idx, ok := store2Idx[batchedTask.storeID]; !ok || len(tasks[idx].batchTaskList) >= req.StoreBatchSize {
tasks = append(tasks, batchedTask.task)
store2Idx[batchedTask.storeID] = len(tasks) - 1
} else {
if tasks[idx].batchTaskList == nil {
tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, req.StoreBatchSize)
// disable paging for batched task.
tasks[idx].paging = false
tasks[idx].pagingSize = 0
}
if task.RowCountHint > 0 {
tasks[idx].RowCountHint += task.RowCountHint
}
tasks[idx].batchTaskList[taskID] = batchedTask
}
} else {
tasks = append(tasks, task)
if err = builder.handle(task); err != nil {
return nil, err
}
i = nextI
if req.Paging.Enable {
pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize)
}
taskID++
}
}

if req.Desc {
reverseTasks(tasks)
builder.reverse()
}
tasks := builder.build()
if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildCopTasks takes too much time",
zap.Duration("elapsed", elapsed),
zap.Int("range len", rangesLen),
zap.Int("task len", len(tasks)))
}
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks)))
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum()))
return tasks, nil
}

type taskBuilder interface {
handle(*copTask) error
reverse()
build() []*copTask
regionNum() int
}

type legacyTaskBuilder struct {
tasks []*copTask
}

func newLegacyTaskBuilder(hint int) *legacyTaskBuilder {
return &legacyTaskBuilder{
tasks: make([]*copTask, 0, hint),
}
}

func (b *legacyTaskBuilder) handle(task *copTask) error {
b.tasks = append(b.tasks, task)
return nil
}

func (b *legacyTaskBuilder) regionNum() int {
return len(b.tasks)
}

func (b *legacyTaskBuilder) reverse() {
reverseTasks(b.tasks)
}

func (b *legacyTaskBuilder) build() []*copTask {
return b.tasks
}

type batchStoreTaskBuilder struct {
bo *Backoffer
req *kv.Request
cache *RegionCache
taskID uint64
limit int
store2Idx map[uint64]int
tasks []*copTask
}

func newBatchTaskBuilder(bo *Backoffer, req *kv.Request, cache *RegionCache) *batchStoreTaskBuilder {
return &batchStoreTaskBuilder{
bo: bo,
req: req,
cache: cache,
taskID: 0,
limit: req.StoreBatchSize,
store2Idx: make(map[uint64]int, 16),
tasks: make([]*copTask, 0, 16),
}
}

func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) {
b.taskID++
task.taskID = b.taskID
handled := false
defer func() {
if !handled && err == nil {
// fallback to non-batch way. It's mainly caused by region miss.
b.tasks = append(b.tasks, task)
}
}()
if b.limit <= 0 {
return nil
}
batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead)
if err != nil {
return err
}
if batchedTask == nil {
return nil
}
if idx, ok := b.store2Idx[batchedTask.storeID]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit {
b.tasks = append(b.tasks, batchedTask.task)
b.store2Idx[batchedTask.storeID] = len(b.tasks) - 1
} else {
if b.tasks[idx].batchTaskList == nil {
b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit)
// disable paging for batched task.
b.tasks[idx].paging = false
b.tasks[idx].pagingSize = 0
}
if task.RowCountHint > 0 {
b.tasks[idx].RowCountHint += task.RowCountHint
}
b.tasks[idx].batchTaskList[task.taskID] = batchedTask
}
handled = true
return nil
}

func (b *batchStoreTaskBuilder) regionNum() int {
// we allocate b.taskID for each region task, so the final b.taskID is equal to the related region number.
return int(b.taskID)
}

func (b *batchStoreTaskBuilder) reverse() {
reverseTasks(b.tasks)
}

func (b *batchStoreTaskBuilder) build() []*copTask {
return b.tasks
}

func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
servers, err := infosync.GetAllServerInfo(context.Background())
if err != nil {
Expand Down Expand Up @@ -1138,13 +1225,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if err != nil {
return remains, err
}
return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.BatchResponses, task, ch)
return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.GetBatchResponses(), task, ch)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, lockErr, task); err != nil {
return nil, err
}
return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.BatchResponses, task, ch)
return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch)
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand Down Expand Up @@ -1250,16 +1337,26 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains
}

// handle the batched cop response.
// tasks will be changed, so the input tasks should not be used after calling this function.
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) {
if len(tasks) == 0 {
return nil, nil
}
var remainTasks []*copTask
appendRemainTasks := func(tasks ...*copTask) {
if remainTasks == nil {
// allocate size fo remain length
remainTasks = make([]*copTask, 0, len(tasks))
}
remainTasks = append(remainTasks, tasks...)
}
for _, batchResp := range batchResps {
batchedTask, ok := tasks[batchResp.GetTaskId()]
taskID := batchResp.GetTaskId()
batchedTask, ok := tasks[taskID]
if !ok {
return nil, errors.Errorf("task id %d not found", batchResp.GetTaskId())
}
delete(tasks, taskID)
resp := &copResponse{
pbResp: &coprocessor.Response{
Data: batchResp.Data,
Expand All @@ -1276,15 +1373,15 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp
if err != nil {
return nil, err
}
remainTasks = append(remainTasks, remains...)
appendRemainTasks(remains...)
continue
}
//TODO: handle locks in batch
if lockErr := batchResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, resp.pbResp.GetLocked(), task); err != nil {
return nil, err
}
remainTasks = append(remainTasks, task)
appendRemainTasks(task)
continue
}
if otherErr := batchResp.GetOtherError(); otherErr != "" {
Expand Down Expand Up @@ -1312,6 +1409,24 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp
// TODO: check OOM
worker.sendToRespCh(resp, ch, false)
}
for _, t := range tasks {
task := t.task
// when the error is generated by client, response is empty, skip warning for this case.
if len(batchResps) != 0 {
firstRangeStartKey := task.ranges.At(0).StartKey
lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey
logutil.Logger(bo.GetCtx()).Error("response of batched task missing",
zap.Uint64("id", task.taskID),
zap.Uint64("txnStartTS", worker.req.StartTs),
zap.Uint64("regionID", task.region.GetID()),
zap.Uint64("bucketsVer", task.bucketsVer),
zap.Int("rangeNums", task.ranges.Len()),
zap.ByteString("firstRangeStartKey", firstRangeStartKey),
zap.ByteString("lastRangeEndKey", lastRangeEndKey),
zap.String("storeAddr", task.storeAddr))
}
appendRemainTasks(t.task)
}
return remainTasks, nil
}

Expand Down
4 changes: 2 additions & 2 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -210,8 +209,9 @@ func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead k
if err != nil {
return nil, err
}
// fallback to non-batch path
if rpcContext == nil {
return nil, errors.Errorf("region %s missing", task.region.String())
return nil, nil
}
return &batchedCopTask{
task: task,
Expand Down

0 comments on commit d0d6955

Please sign in to comment.