From 9d9eaca77fc7a52231d0425302536e793612ce38 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 1 Dec 2022 23:58:02 +0800 Subject: [PATCH] store/copr: support batch coprocessor requests by store (#39525) ref pingcap/tidb#39361 --- distsql/request_builder.go | 1 + kv/kv.go | 2 + sessionctx/variable/session.go | 3 + sessionctx/variable/sysvar.go | 7 + sessionctx/variable/tidb_vars.go | 4 + store/copr/copr_test/coprocessor_test.go | 47 +++++ store/copr/coprocessor.go | 235 +++++++++++++++++++---- store/copr/region_cache.go | 28 +++ 8 files changed, 291 insertions(+), 36 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index a293c4d10963e..09ab4094ab732 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -289,6 +289,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req } builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL builder.RequestSource.RequestSourceType = sv.RequestSourceType + builder.StoreBatchSize = sv.StoreBatchSize return builder } diff --git a/kv/kv.go b/kv/kv.go index 8263746093a5c..38243aa13db08 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -537,6 +537,8 @@ type Request struct { RequestSource util.RequestSource // FixedRowCountHint is the optimization hint for copr request for task scheduling. FixedRowCountHint []int + // StoreBatchSize indicates the batch size of coprocessor in the same store. + StoreBatchSize int } // CoprRequestAdjuster is used to check and adjust a copr request according to specific rules. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 69c0d4d48c607..13871301d29c5 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1319,6 +1319,9 @@ type SessionVars struct { // EnablePlanReplayerCapture indicates whether enabled plan replayer capture EnablePlanReplayerCapture bool + + // StoreBatchSize indicates the batch size limit of store batch, set this field to 0 to disable store batch. + StoreBatchSize int } // GetNewChunkWithCapacity Attempt to request memory from the chunk pool diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 5af72b6ae21fb..cb53d5230fe25 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2103,6 +2103,13 @@ var defaultSysVars = []*SysVar{ s.EnableReuseCheck = TiDBOptOn(val) return nil }}, + { + Scope: ScopeGlobal | ScopeSession, Name: TiDBStoreBatchSize, Value: strconv.FormatInt(DefTiDBStoreBatchSize, 10), + Type: TypeInt, MinValue: 0, MaxValue: 25000, SetSession: func(s *SessionVars, val string) error { + s.StoreBatchSize = TidbOptInt(val, DefTiDBStoreBatchSize) + return nil + }, + }, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 362f2454a53df..0bc3784d857dc 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -781,6 +781,9 @@ const ( TiDBEnablePlanReplayerCapture = "tidb_enable_plan_replayer_capture" // TiDBEnableReusechunk indicates whether to enable chunk alloc TiDBEnableReusechunk = "tidb_enable_reuse_chunk" + + // TiDBStoreBatchSize indicates the batch size of coprocessor in the same store. + TiDBStoreBatchSize = "tidb_store_batch_size" ) // TiDB vars that have only global scope @@ -1108,6 +1111,7 @@ const ( DefTiDBUseAlloc = false DefTiDBEnablePlanReplayerCapture = false DefTiDBIndexMergeIntersectionConcurrency = ConcurrencyUnset + DefTiDBStoreBatchSize = 0 ) // Process global variables. diff --git a/store/copr/copr_test/coprocessor_test.go b/store/copr/copr_test/coprocessor_test.go index 208f2e2bd2190..584de99b11a75 100644 --- a/store/copr/copr_test/coprocessor_test.go +++ b/store/copr/copr_test/coprocessor_test.go @@ -98,3 +98,50 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { require.Equal(t, smallConc, 0) require.Equal(t, rateLimit.GetCapacity(), 4) } + +func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { + // nil --- 'g' --- 'n' --- 't' --- nil + // <- 0 -> <- 1 -> <- 2 -> <- 3 -> + store, err := mockstore.NewMockStore( + mockstore.WithClusterInspector(func(c testutils.Cluster) { + mockstore.BootstrapWithMultiRegions(c, []byte("g"), []byte("n"), []byte("t")) + }), + ) + require.NoError(t, err) + defer require.NoError(t, store.Close()) + copClient := store.GetClient().(*copr.CopClient) + ctx := context.Background() + killed := uint32(0) + vars := kv.NewVariables(&killed) + opt := &kv.ClientSendOption{} + + req := &kv.Request{ + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), + FixedRowCountHint: []int{1, 1, 3, 3}, + Concurrency: 15, + StoreBatchSize: 1, + } + it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt) + require.Nil(t, errRes) + tasks := it.GetTasks() + require.Equal(t, len(tasks), 2) + require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1) + require.Equal(t, tasks[0].RowCountHint, 5) + require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1) + require.Equal(t, tasks[1].RowCountHint, 9) + + req = &kv.Request{ + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), + FixedRowCountHint: []int{1, 1, 3, 3}, + Concurrency: 15, + StoreBatchSize: 3, + } + it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) + require.Nil(t, errRes) + tasks = it.GetTasks() + require.Equal(t, len(tasks), 1) + require.Equal(t, len(tasks[0].ToPBBatchTasks()), 3) + require.Equal(t, tasks[0].RowCountHint, 14) +} diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index fdff94a09717d..3bf5cd89104db 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" @@ -131,6 +132,14 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars failpoint.Inject("disableFixedRowCountHint", func(_ failpoint.Value) { req.FixedRowCountHint = nil }) + if req.Tp != kv.ReqTypeDAG || req.StoreType != kv.TiKV { + req.StoreBatchSize = 0 + } + // TODO: support keep-order batch + if req.ReplicaRead != kv.ReplicaReadLeader || req.KeepOrder { + // disable batch copr for follower read + req.StoreBatchSize = 0 + } bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) var ( @@ -154,6 +163,8 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars // This is because it's possible that TiDB merge multiple small partition into one region which break some assumption. // Keep it split by partition would be more safe. err = req.KeyRanges.ForEachPartitionWithErr(buildTaskFunc) + // only batch store requests in first build. + req.StoreBatchSize = 0 reqType := "null" if req.ClosestReplicaReadAdjuster != nil { reqType = "miss" @@ -224,6 +235,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars // copTask contains a related Region and KeyRange for a kv.Request. type copTask struct { + taskID uint64 region tikv.RegionVerID bucketsVer uint64 ranges *KeyRanges @@ -241,6 +253,14 @@ type copTask struct { partitionIndex int64 // used by balanceBatchCopTask in PartitionTableScan requestSource util.RequestSource RowCountHint int // used for extra concurrency of small tasks, -1 for unknown row count + batchTaskList map[uint64]*batchedCopTask +} + +type batchedCopTask struct { + task *copTask + region coprocessor.RegionInfo + storeID uint64 + peer *metapb.Peer } func (r *copTask) String() string { @@ -248,6 +268,23 @@ func (r *copTask) String() string { r.region.GetID(), r.region.GetConfVer(), r.region.GetVer(), r.ranges.Len(), r.storeAddr) } +func (r *copTask) ToPBBatchTasks() []*coprocessor.StoreBatchTask { + if len(r.batchTaskList) == 0 { + return nil + } + pbTasks := make([]*coprocessor.StoreBatchTask, 0, len(r.batchTaskList)) + for _, task := range r.batchTaskList { + pbTasks = append(pbTasks, &coprocessor.StoreBatchTask{ + RegionId: task.region.GetRegionId(), + RegionEpoch: task.region.GetRegionEpoch(), + Peer: task.peer, + Ranges: task.region.GetRanges(), + TaskId: task.task.taskID, + }) + } + return pbTasks +} + // rangesPerTask limits the length of the ranges slice sent in one copTask. const rangesPerTask = 25000 @@ -276,6 +313,11 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv tasks := make([]*copTask, 0, len(locs)) origRangeIdx := 0 + taskID := uint64(0) + var store2Idx map[uint64]int + if req.StoreBatchSize > 0 { + store2Idx = make(map[uint64]int, 16) + } for _, loc := range locs { // TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice // to make sure the message can be sent successfully. @@ -310,7 +352,8 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv hint += req.FixedRowCountHint[nextOrigRangeIdx] } } - tasks = append(tasks, &copTask{ + task := &copTask{ + taskID: taskID, region: loc.Location.Region, bucketsVer: loc.getBucketVersion(), ranges: loc.Ranges.Slice(i, nextI), @@ -322,11 +365,35 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv pagingSize: pagingSize, requestSource: req.RequestSource, RowCountHint: hint, - }) + } + if req.StoreBatchSize > 0 { + batchedTask, err := cache.BuildBatchTask(bo, task, req.ReplicaRead) + if err != nil { + return nil, err + } + if idx, ok := store2Idx[batchedTask.storeID]; !ok || len(tasks[idx].batchTaskList) >= req.StoreBatchSize { + tasks = append(tasks, batchedTask.task) + store2Idx[batchedTask.storeID] = len(tasks) - 1 + } else { + if tasks[idx].batchTaskList == nil { + tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, req.StoreBatchSize) + // disable paging for batched task. + tasks[idx].paging = false + tasks[idx].pagingSize = 0 + } + if task.RowCountHint > 0 { + tasks[idx].RowCountHint += task.RowCountHint + } + tasks[idx].batchTaskList[taskID] = batchedTask + } + } else { + tasks = append(tasks, task) + } i = nextI if req.Paging.Enable { pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) } + taskID++ } } @@ -689,6 +756,11 @@ func (it *copIterator) GetSendRate() *util.RateLimit { return it.sendRate } +// GetTasks returns the built tasks. +func (it *copIterator) GetTasks() []*copTask { + return it.tasks +} + func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask, sendTo chan<- *copTask) (exit bool) { select { case sendTo <- t: @@ -872,6 +944,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch Ranges: task.ranges.ToPBRanges(), SchemaVer: worker.req.SchemaVar, PagingSize: task.pagingSize, + Tasks: task.ToPBBatchTasks(), } var cacheKey []byte @@ -1057,37 +1130,17 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R return nil, errors.Trace(err) } // We may meet RegionError at the first packet, but not during visiting the stream. - return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + remains, err := buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + if err != nil { + return remains, err + } + return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.BatchResponses, task, ch) } - var resolveLockDetail *util.ResolveLockDetail if lockErr := resp.pbResp.GetLocked(); lockErr != nil { - resolveLockDetail = worker.getLockResolverDetails() - // Be care that we didn't redact the SQL statement because the log is DEBUG level. - if task.eventCb != nil { - task.eventCb(trxevents.WrapCopMeetLock(&trxevents.CopMeetLock{ - LockInfo: lockErr, - })) - } else { - logutil.Logger(bo.GetCtx()).Debug("coprocessor encounters lock", - zap.Stringer("lock", lockErr)) - } - resolveLocksOpts := txnlock.ResolveLocksOptions{ - CallerStartTS: worker.req.StartTs, - Locks: []*txnlock.Lock{txnlock.NewLock(lockErr)}, - Detail: resolveLockDetail, - } - resolveLocksRes, err1 := worker.kvclient.ResolveLocksWithOpts(bo.TiKVBackoffer(), resolveLocksOpts) - err1 = derr.ToTiDBErr(err1) - if err1 != nil { - return nil, errors.Trace(err1) - } - msBeforeExpired := resolveLocksRes.TTL - if msBeforeExpired > 0 { - if err := bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.New(lockErr.String())); err != nil { - return nil, errors.Trace(err) - } + if err := worker.handleLockErr(bo, lockErr, task); err != nil { + return nil, err } - return []*copTask{task}, nil + return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.BatchResponses, task, ch) } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) @@ -1116,7 +1169,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R } else if task.ranges != nil && task.ranges.Len() > 0 { resp.startKey = task.ranges.At(0).StartKey } - worker.handleCollectExecutionInfo(bo, rpcCtx, resp, resolveLockDetail) + worker.handleCollectExecutionInfo(bo, rpcCtx, resp) resp.respTime = costTime if resp.pbResp.IsCacheHit { coprCacheCounterHit.Add(1) @@ -1174,8 +1227,121 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R } } } + batchResps := resp.pbResp.BatchResponses worker.sendToRespCh(resp, ch, true) - return nil, nil + return worker.handleBatchCopResponse(bo, batchResps, task.batchTaskList, ch) +} + +func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { + if len(task.batchTaskList) == 0 { + return remains, nil + } + batchedTasks := task.batchTaskList + task.batchTaskList = nil + batchedRemains, err := worker.handleBatchCopResponse(bo, batchResp, batchedTasks, ch) + if err != nil { + return nil, err + } + return append(remains, batchedRemains...), nil +} + +// handle the batched cop response. +func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) { + if len(tasks) == 0 { + return nil, nil + } + var remainTasks []*copTask + for _, batchResp := range batchResps { + batchedTask, ok := tasks[batchResp.GetTaskId()] + if !ok { + return nil, errors.Errorf("task id %d not found", batchResp.GetTaskId()) + } + resp := &copResponse{ + pbResp: &coprocessor.Response{ + Data: batchResp.Data, + }, + } + task := batchedTask.task + if regionErr := batchResp.GetRegionError(); regionErr != nil { + errStr := fmt.Sprintf("region_id:%v, region_ver:%v, store_type:%s, peer_addr:%s, error:%s", + task.region.GetID(), task.region.GetVer(), task.storeType.Name(), task.storeAddr, regionErr.String()) + if err := bo.Backoff(tikv.BoRegionMiss(), errors.New(errStr)); err != nil { + return nil, errors.Trace(err) + } + remains, err := buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + if err != nil { + return nil, err + } + remainTasks = append(remainTasks, remains...) + continue + } + //TODO: handle locks in batch + if lockErr := batchResp.GetLocked(); lockErr != nil { + if err := worker.handleLockErr(bo, resp.pbResp.GetLocked(), task); err != nil { + return nil, err + } + remainTasks = append(remainTasks, task) + continue + } + if otherErr := batchResp.GetOtherError(); otherErr != "" { + err := errors.Errorf("other error: %s", otherErr) + + firstRangeStartKey := task.ranges.At(0).StartKey + lastRangeEndKey := task.ranges.At(task.ranges.Len() - 1).EndKey + + logutil.Logger(bo.GetCtx()).Warn("other error", + zap.Uint64("txnStartTS", worker.req.StartTs), + zap.Uint64("regionID", task.region.GetID()), + zap.Uint64("bucketsVer", task.bucketsVer), + // TODO: add bucket version in log + //zap.Uint64("latestBucketsVer", batchResp.GetLatestBucketsVersion()), + zap.Int("rangeNums", task.ranges.Len()), + zap.ByteString("firstRangeStartKey", firstRangeStartKey), + zap.ByteString("lastRangeEndKey", lastRangeEndKey), + zap.String("storeAddr", task.storeAddr), + zap.Error(err)) + if strings.Contains(err.Error(), "write conflict") { + return nil, kv.ErrWriteConflict.FastGen("%s", otherErr) + } + return nil, errors.Trace(err) + } + // TODO: check OOM + worker.sendToRespCh(resp, ch, false) + } + return remainTasks, nil +} + +func (worker *copIteratorWorker) handleLockErr(bo *Backoffer, lockErr *kvrpcpb.LockInfo, task *copTask) error { + if lockErr == nil { + return nil + } + resolveLockDetail := worker.getLockResolverDetails() + // Be care that we didn't redact the SQL statement because the log is DEBUG level. + if task.eventCb != nil { + task.eventCb(trxevents.WrapCopMeetLock(&trxevents.CopMeetLock{ + LockInfo: lockErr, + })) + } else { + logutil.Logger(bo.GetCtx()).Debug("coprocessor encounters lock", + zap.Stringer("lock", lockErr)) + } + resolveLocksOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: worker.req.StartTs, + Locks: []*txnlock.Lock{txnlock.NewLock(lockErr)}, + Detail: resolveLockDetail, + } + resolveLocksRes, err1 := worker.kvclient.ResolveLocksWithOpts(bo.TiKVBackoffer(), resolveLocksOpts) + err1 = derr.ToTiDBErr(err1) + if err1 != nil { + return errors.Trace(err1) + } + msBeforeExpired := resolveLocksRes.TTL + if msBeforeExpired > 0 { + if err := bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.New(lockErr.String())); err != nil { + return errors.Trace(err) + } + } + return nil } func (worker *copIteratorWorker) getLockResolverDetails() *util.ResolveLockDetail { @@ -1185,7 +1351,7 @@ func (worker *copIteratorWorker) getLockResolverDetails() *util.ResolveLockDetai return &util.ResolveLockDetail{} } -func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, resolveLockDetail *util.ResolveLockDetail) { +func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) { defer func() { worker.kvclient.Stats = nil }() @@ -1213,9 +1379,6 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt resp.detail.CalleeAddress = rpcCtx.Addr } sd := &util.ScanDetail{} - if resolveLockDetail != nil { - sd.ResolveLock = resolveLockDetail - } td := util.TimeDetail{} if pbDetails := resp.pbResp.ExecDetailsV2; pbDetails != nil { // Take values in `ExecDetailsV2` first. diff --git a/store/copr/region_cache.go b/store/copr/region_cache.go index 4aa970aa458a4..a3fd20e036d43 100644 --- a/store/copr/region_cache.go +++ b/store/copr/region_cache.go @@ -18,9 +18,13 @@ import ( "bytes" "strconv" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/kv" derr "github.com/pingcap/tidb/store/driver/error" + "github.com/pingcap/tidb/store/driver/options" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" "github.com/tikv/client-go/v2/metrics" @@ -199,3 +203,27 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store c.OnSendFailForTiFlash(bo.TiKVBackoffer(), store, ri.Region, ri.Meta, scheduleReload, err, !(index < 10 || log.GetLevel() <= zap.DebugLevel)) } } + +// BuildBatchTask fetches store and peer info for cop task, wrap it as `batchedCopTask`. +func (c *RegionCache) BuildBatchTask(bo *Backoffer, task *copTask, replicaRead kv.ReplicaReadType) (*batchedCopTask, error) { + rpcContext, err := c.GetTiKVRPCContext(bo.TiKVBackoffer(), task.region, options.GetTiKVReplicaReadType(replicaRead), 0) + if err != nil { + return nil, err + } + if rpcContext == nil { + return nil, errors.Errorf("region %s missing", task.region.String()) + } + return &batchedCopTask{ + task: task, + region: coprocessor.RegionInfo{ + RegionId: rpcContext.Region.GetID(), + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: rpcContext.Region.GetConfVer(), + Version: rpcContext.Region.GetVer(), + }, + Ranges: task.ranges.ToPBRanges(), + }, + storeID: rpcContext.Store.StoreID(), + peer: rpcContext.Peer, + }, nil +}