From faf5bd9ebab4037b741b1b913284d1750df74186 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Wed, 21 Aug 2019 16:44:16 +0800 Subject: [PATCH 1/6] add rateLimiter into batch actions --- store/tikv/2pc.go | 142 +++++++++++++++++++++++++++++----------------- 1 file changed, 90 insertions(+), 52 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index f0403e7334ed9..70b6b20025289 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -115,6 +115,13 @@ type twoPhaseCommitter struct { regionTxnSize map[uint64]int } +// taskProcessor add rateLimiter to control concurrency of txn 2pc +type taskProcessor struct { + rateLimiter *rateLimit +} + +type procFunc func(bo *Backoffer, batch batchKeys) error + type mutationEx struct { pb.Mutation asserted bool @@ -405,58 +412,9 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm return errors.Trace(e) } - // For prewrite, stop sending other requests after receiving first error. - backoffer := bo - var cancel context.CancelFunc - if action == actionPrewrite { - backoffer, cancel = bo.Fork() - defer cancel() - } - - // Concurrently do the work for each batch. - ch := make(chan error, len(batches)) - for _, batch1 := range batches { - - batch := batch1 - go func() { - if action == actionCommit { - // Because the secondary batches of the commit actions are implemented to be - // committed asynchronously in background goroutines, we should not - // fork a child context and call cancel() while the foreground goroutine exits. - // Otherwise the background goroutines will be canceled execeptionally. - // Here we makes a new clone of the original backoffer for this goroutine - // exclusively to avoid the data race when using the same backoffer - // in concurrent goroutines. - singleBatchBackoffer := backoffer.Clone() - ch <- singleBatchActionFunc(singleBatchBackoffer, batch) - } else { - singleBatchBackoffer, singleBatchCancel := backoffer.Fork() - defer singleBatchCancel() - ch <- singleBatchActionFunc(singleBatchBackoffer, batch) - } - }() - } - var err error - for i := 0; i < len(batches); i++ { - if e := <-ch; e != nil { - logutil.BgLogger().Debug("2PC doActionOnBatches failed", - zap.Uint64("conn", c.connID), - zap.Stringer("action type", action), - zap.Error(e), - zap.Uint64("txnStartTS", c.startTS)) - // Cancel other requests and return the first error. - if cancel != nil { - logutil.BgLogger().Debug("2PC doActionOnBatches to cancel other actions", - zap.Uint64("conn", c.connID), - zap.Stringer("action type", action), - zap.Uint64("txnStartTS", c.startTS)) - cancel() - } - if err == nil { - err = e - } - } - } + rateLim := len(batches) // this will be used for LargeTxn + processor := newTaskProcessor(rateLim) + err := processor.process(c, singleBatchActionFunc, bo, action, batches) return errors.Trace(err) } @@ -1050,3 +1008,83 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn } return b } + +// newTaskProcessor create processor to handle concurrent batch works(prewrite/commit etc) +func newTaskProcessor(rateLimit int) *taskProcessor { + return &taskProcessor{newRateLimit(rateLimit)} +} + +// startWork concurrently do the work for each batch considering rate limit +func (tp *taskProcessor) startWorker(exitCh chan struct{}, ch chan error, + procFn procFunc, action twoPhaseCommitAction, + backoffer *Backoffer, batches []batchKeys) { + for idx, batch1 := range batches { + if exit := tp.rateLimiter.getToken(exitCh); !exit { + batch := batch1 + go func(i int) { + var procRes error + defer tp.rateLimiter.putToken() + if action == actionCommit { + // Because the secondary batches of the commit actions are implemented to be + // committed asynchronously in background goroutines, we should not + // fork a child context and call cancel() while the foreground goroutine exits. + // Otherwise the background goroutines will be canceled execeptionally. + // Here we makes a new clone of the original backoffer for this goroutine + // exclusively to avoid the data race when using the same backoffer + // in concurrent goroutines. + singleBatchBackoffer := backoffer.Clone() + procRes = procFn(singleBatchBackoffer, batch) + } else { + singleBatchBackoffer, singleBatchCancel := backoffer.Fork() + defer singleBatchCancel() + procRes = procFn(singleBatchBackoffer, batch) + } + ch <- procRes + }(idx) + } else { + logutil.Logger(backoffer.ctx).Info("break startWorker", zap.Stringer("action", action)) + break + } + } +} + +// process will start worker routine and collect results +func (tp *taskProcessor) process(committer *twoPhaseCommitter, procFn procFunc, + bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error { + var err error + c := committer + // For prewrite, stop sending other requests after receiving first error. + backoffer := bo + var cancel context.CancelFunc + if action == actionPrewrite { + backoffer, cancel = bo.Fork() + defer cancel() + } + // concurrently do the work for each batch. + ch := make(chan error, len(batches)) + exitCh := make(chan struct{}) + go tp.startWorker(exitCh, ch, procFn, action, backoffer, batches) + // check results + for i := 0; i < len(batches); i++ { + if e := <-ch; e != nil { + logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches failed", + zap.Uint64("conn", c.connID), + zap.Stringer("action type", action), + zap.Error(e), + zap.Uint64("txnStartTS", c.startTS)) + // Cancel other requests and return the first error. + if cancel != nil { + logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches to cancel other actions", + zap.Uint64("conn", c.connID), + zap.Stringer("action type", action), + zap.Uint64("txnStartTS", c.startTS)) + cancel() + } + if err == nil { + err = e + } + } + } + close(exitCh) + return err +} From 0cc04cfc260a39c7df6d80e96efeb8b93f6bd002 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Fri, 23 Aug 2019 10:28:57 +0800 Subject: [PATCH 2/6] change func input param and format --- store/tikv/2pc.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 70b6b20025289..afa89af93ddcf 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1021,7 +1021,7 @@ func (tp *taskProcessor) startWorker(exitCh chan struct{}, ch chan error, for idx, batch1 := range batches { if exit := tp.rateLimiter.getToken(exitCh); !exit { batch := batch1 - go func(i int) { + go func() { var procRes error defer tp.rateLimiter.putToken() if action == actionCommit { @@ -1040,19 +1040,19 @@ func (tp *taskProcessor) startWorker(exitCh chan struct{}, ch chan error, procRes = procFn(singleBatchBackoffer, batch) } ch <- procRes - }(idx) + }() } else { - logutil.Logger(backoffer.ctx).Info("break startWorker", zap.Stringer("action", action)) + logutil.Logger(backoffer.ctx).Info("break startWorker", zap.Stringer("action", action), + zap.Int("batch size", len(batches)), zap.Int("index", idx)) break } } } // process will start worker routine and collect results -func (tp *taskProcessor) process(committer *twoPhaseCommitter, procFn procFunc, +func (tp *taskProcessor) process(c *twoPhaseCommitter, procFn procFunc, bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error { var err error - c := committer // For prewrite, stop sending other requests after receiving first error. backoffer := bo var cancel context.CancelFunc From 37f03cf03184e54b35266edd9451dd4b9275b23a Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Fri, 23 Aug 2019 11:42:28 +0800 Subject: [PATCH 3/6] change task processor into batchExecutor, inject related fields into this struct instead of func params --- store/tikv/2pc.go | 113 ++++++++++++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 50 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index afa89af93ddcf..781fcc8bdf036 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -115,12 +115,16 @@ type twoPhaseCommitter struct { regionTxnSize map[uint64]int } -// taskProcessor add rateLimiter to control concurrency of txn 2pc -type taskProcessor struct { - rateLimiter *rateLimit +// batchExecutor is txn controller providing rate control like utils +type batchExecutor struct { + rateLimiter *rateLimit // rate limit for concurrency control, maybe more strategies + committer *twoPhaseCommitter // here maybe more different type committer in the future + action twoPhaseCommitAction // the work action type + procFn procOneBatchFn // injected proc batch function + backoffer *Backoffer // Backoffer } -type procFunc func(bo *Backoffer, batch batchKeys) error +type procOneBatchFn func(bo *Backoffer, batch batchKeys) error type mutationEx struct { pb.Mutation @@ -384,10 +388,18 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA // doActionOnBatches does action to batches in parallel. func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error { - if len(batches) == 0 { - return nil + singleBatchActionFunc, err := c.getProcFuncByType(action) + if err != nil { + return err } - var singleBatchActionFunc func(bo *Backoffer, batch batchKeys) error + rateLim := len(batches) // this will be used for LargeTxn, set rateLim here + batchExecutor := newBatchExecutor(rateLim, c, action, singleBatchActionFunc, bo) + err = batchExecutor.process(batches) + return errors.Trace(err) +} + +func (c *twoPhaseCommitter) getProcFuncByType(action twoPhaseCommitAction) (procOneBatchFn, error) { + var singleBatchActionFunc procOneBatchFn switch action { case actionPrewrite: singleBatchActionFunc = c.prewriteSingleBatch @@ -399,23 +411,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm singleBatchActionFunc = c.pessimisticLockSingleBatch case actionPessimisticRollback: singleBatchActionFunc = c.pessimisticRollbackSingleBatch + default: + return nil, errors.Errorf("invalid action type=%v", action) } - if len(batches) == 1 { - e := singleBatchActionFunc(bo, batches[0]) - if e != nil { - logutil.BgLogger().Debug("2PC doActionOnBatches failed", - zap.Uint64("conn", c.connID), - zap.Stringer("action type", action), - zap.Error(e), - zap.Uint64("txnStartTS", c.startTS)) - } - return errors.Trace(e) - } - - rateLim := len(batches) // this will be used for LargeTxn - processor := newTaskProcessor(rateLim) - err := processor.process(c, singleBatchActionFunc, bo, action, batches) - return errors.Trace(err) + return singleBatchActionFunc, nil } func (c *twoPhaseCommitter) keyValueSize(key []byte) int { @@ -1009,22 +1008,22 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn return b } -// newTaskProcessor create processor to handle concurrent batch works(prewrite/commit etc) -func newTaskProcessor(rateLimit int) *taskProcessor { - return &taskProcessor{newRateLimit(rateLimit)} +// newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc) +func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter, + action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor { + return &batchExecutor{newRateLimit(rateLimit), committer, + action, procFn, backoffer} } // startWork concurrently do the work for each batch considering rate limit -func (tp *taskProcessor) startWorker(exitCh chan struct{}, ch chan error, - procFn procFunc, action twoPhaseCommitAction, - backoffer *Backoffer, batches []batchKeys) { +func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, batches []batchKeys) { for idx, batch1 := range batches { - if exit := tp.rateLimiter.getToken(exitCh); !exit { + if exit := batchExe.rateLimiter.getToken(exitCh); !exit { batch := batch1 go func() { var procRes error - defer tp.rateLimiter.putToken() - if action == actionCommit { + defer batchExe.rateLimiter.putToken() + if batchExe.action == actionCommit { // Because the secondary batches of the commit actions are implemented to be // committed asynchronously in background goroutines, we should not // fork a child context and call cancel() while the foreground goroutine exits. @@ -1032,52 +1031,66 @@ func (tp *taskProcessor) startWorker(exitCh chan struct{}, ch chan error, // Here we makes a new clone of the original backoffer for this goroutine // exclusively to avoid the data race when using the same backoffer // in concurrent goroutines. - singleBatchBackoffer := backoffer.Clone() - procRes = procFn(singleBatchBackoffer, batch) + singleBatchBackoffer := batchExe.backoffer.Clone() + procRes = batchExe.procFn(singleBatchBackoffer, batch) } else { - singleBatchBackoffer, singleBatchCancel := backoffer.Fork() + singleBatchBackoffer, singleBatchCancel := batchExe.backoffer.Fork() defer singleBatchCancel() - procRes = procFn(singleBatchBackoffer, batch) + procRes = batchExe.procFn(singleBatchBackoffer, batch) } ch <- procRes }() } else { - logutil.Logger(backoffer.ctx).Info("break startWorker", zap.Stringer("action", action), - zap.Int("batch size", len(batches)), zap.Int("index", idx)) + logutil.Logger(batchExe.backoffer.ctx).Info("break startWorker", + zap.Stringer("action", batchExe.action), zap.Int("batch size", len(batches)), + zap.Int("index", idx)) break } } } // process will start worker routine and collect results -func (tp *taskProcessor) process(c *twoPhaseCommitter, procFn procFunc, - bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error { +func (batchExe *batchExecutor) process(batches []batchKeys) error { var err error + if len(batches) == 0 { + return nil + } + if len(batches) == 1 { + e := batchExe.procFn(batchExe.backoffer, batches[0]) + if e != nil { + logutil.BgLogger().Debug("2PC doActionOnBatches failed", + zap.Uint64("conn", batchExe.committer.connID), + zap.Stringer("action type", batchExe.action), + zap.Error(e), + zap.Uint64("txnStartTS", batchExe.committer.startTS)) + } + return errors.Trace(e) + } // For prewrite, stop sending other requests after receiving first error. - backoffer := bo + backoffer := batchExe.backoffer var cancel context.CancelFunc - if action == actionPrewrite { - backoffer, cancel = bo.Fork() + if batchExe.action == actionPrewrite { + backoffer, cancel = batchExe.backoffer.Fork() defer cancel() } // concurrently do the work for each batch. ch := make(chan error, len(batches)) exitCh := make(chan struct{}) - go tp.startWorker(exitCh, ch, procFn, action, backoffer, batches) + go batchExe.startWorker(exitCh, ch, batches) // check results for i := 0; i < len(batches); i++ { if e := <-ch; e != nil { logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches failed", - zap.Uint64("conn", c.connID), - zap.Stringer("action type", action), + zap.Uint64("conn", batchExe.committer.connID), + zap.Stringer("action type", batchExe.action), zap.Error(e), - zap.Uint64("txnStartTS", c.startTS)) + zap.Uint64("txnStartTS", batchExe.committer.startTS)) // Cancel other requests and return the first error. if cancel != nil { logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches to cancel other actions", - zap.Uint64("conn", c.connID), - zap.Stringer("action type", action), - zap.Uint64("txnStartTS", c.startTS)) + zap.Uint64("conn", batchExe.committer.connID), + zap.Stringer("action type", batchExe.action), + zap.Uint64("txnStartTS", batchExe.committer.startTS)) cancel() } if err == nil { From 3ea5ba862f2545477f7d423d67b967195a7c08c8 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Fri, 23 Aug 2019 13:57:28 +0800 Subject: [PATCH 4/6] do init limiter if needed --- store/tikv/2pc.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 781fcc8bdf036..89f940d74cf56 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -117,7 +117,8 @@ type twoPhaseCommitter struct { // batchExecutor is txn controller providing rate control like utils type batchExecutor struct { - rateLimiter *rateLimit // rate limit for concurrency control, maybe more strategies + rateLim int // concurrent worker numbers + rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies committer *twoPhaseCommitter // here maybe more different type committer in the future action twoPhaseCommitAction // the work action type procFn procOneBatchFn // injected proc batch function @@ -1011,10 +1012,17 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn // newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc) func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter, action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor { - return &batchExecutor{newRateLimit(rateLimit), committer, + return &batchExecutor{rateLimit, nil, committer, action, procFn, backoffer} } +// initUtils do initialize batchExecutor related policies like rateLimit util +func (batchExe *batchExecutor) initUtils() error { + // init rateLimiter by injected rate limit number + batchExe.rateLimiter = newRateLimit(batchExe.rateLim) + return nil +} + // startWork concurrently do the work for each batch considering rate limit func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, batches []batchKeys) { for idx, batch1 := range batches { @@ -1066,6 +1074,12 @@ func (batchExe *batchExecutor) process(batches []batchKeys) error { } return errors.Trace(e) } + err = batchExe.initUtils() + if err != nil { + logutil.Logger(batchExe.backoffer.ctx).Error("batchExecutor initUtils failed", zap.Error(err)) + return err + } + // For prewrite, stop sending other requests after receiving first error. backoffer := batchExe.backoffer var cancel context.CancelFunc From 494203159f59568632b946b8ad02baa80ea3c890 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Tue, 27 Aug 2019 16:40:45 +0800 Subject: [PATCH 5/6] add metric to observe token wait slow --- metrics/metrics.go | 1 + metrics/tikvclient.go | 8 ++++++++ store/tikv/2pc.go | 36 +++++++++++++++++++++--------------- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index a31d5ec3837c6..fd7d65ff571ec 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -152,4 +152,5 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVBatchClientUnavailable) prometheus.MustRegister(TiKVRangeTaskStats) prometheus.MustRegister(TiKVRangeTaskPushDuration) + prometheus.MustRegister(TiKVTokenWaitDuration) } diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index f4cb704f7d38a..704647ab5a0bd 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -223,4 +223,12 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), Help: "duration to push sub tasks to range task workers", }, []string{LblType}) + TiKVTokenWaitDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "batch_executor_token_wait_duration", + Buckets: prometheus.ExponentialBuckets(1, 2, 30), // 1ns ~ 1s + Help: "tidb txn token wait duration to process batches", + }) ) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 6267bc22f4cd8..a9a8d80f2fe7d 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -123,6 +123,7 @@ type batchExecutor struct { action twoPhaseCommitAction // the work action type procFn procOneBatchFn // injected proc batch function backoffer *Backoffer // Backoffer + threshold time.Duration // get token observe threshold } type procOneBatchFn func(bo *Backoffer, batch batchKeys) error @@ -393,6 +394,20 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm if err != nil { return err } + if len(batches) == 0 { + return nil + } + if len(batches) == 1 { + e := singleBatchActionFunc(bo, batches[0]) + if e != nil { + logutil.BgLogger().Debug("2PC doActionOnBatches failed", + zap.Uint64("conn", c.connID), + zap.Stringer("action type", action), + zap.Error(e), + zap.Uint64("txnStartTS", c.startTS)) + } + return errors.Trace(e) + } rateLim := len(batches) // this will be used for LargeTxn, set rateLim here batchExecutor := newBatchExecutor(rateLim, c, action, singleBatchActionFunc, bo) err = batchExecutor.process(batches) @@ -1029,7 +1044,7 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter, action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor { return &batchExecutor{rateLimit, nil, committer, - action, procFn, backoffer} + action, procFn, backoffer, time.Duration(1 * time.Millisecond)} } // initUtils do initialize batchExecutor related policies like rateLimit util @@ -1042,7 +1057,12 @@ func (batchExe *batchExecutor) initUtils() error { // startWork concurrently do the work for each batch considering rate limit func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, batches []batchKeys) { for idx, batch1 := range batches { + waitStart := time.Now() if exit := batchExe.rateLimiter.getToken(exitCh); !exit { + tokenWaitDuration := time.Since(waitStart) + if tokenWaitDuration > batchExe.threshold { + metrics.TiKVTokenWaitDuration.Observe(float64(tokenWaitDuration)) + } batch := batch1 go func() { var singleBatchBackoffer *Backoffer @@ -1084,20 +1104,6 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, // process will start worker routine and collect results func (batchExe *batchExecutor) process(batches []batchKeys) error { var err error - if len(batches) == 0 { - return nil - } - if len(batches) == 1 { - e := batchExe.procFn(batchExe.backoffer, batches[0]) - if e != nil { - logutil.BgLogger().Debug("2PC doActionOnBatches failed", - zap.Uint64("conn", batchExe.committer.connID), - zap.Stringer("action type", batchExe.action), - zap.Error(e), - zap.Uint64("txnStartTS", batchExe.committer.startTS)) - } - return errors.Trace(e) - } err = batchExe.initUtils() if err != nil { logutil.Logger(batchExe.backoffer.ctx).Error("batchExecutor initUtils failed", zap.Error(err)) From 8261507a82b5ea1279344e46517b22249f05b5fa Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Tue, 27 Aug 2019 17:45:15 +0800 Subject: [PATCH 6/6] change wait duration observe per batchExecutor --- store/tikv/2pc.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index a9a8d80f2fe7d..05ba781a50ba8 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -117,13 +117,13 @@ type twoPhaseCommitter struct { // batchExecutor is txn controller providing rate control like utils type batchExecutor struct { - rateLim int // concurrent worker numbers - rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies - committer *twoPhaseCommitter // here maybe more different type committer in the future - action twoPhaseCommitAction // the work action type - procFn procOneBatchFn // injected proc batch function - backoffer *Backoffer // Backoffer - threshold time.Duration // get token observe threshold + rateLim int // concurrent worker numbers + rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies + committer *twoPhaseCommitter // here maybe more different type committer in the future + action twoPhaseCommitAction // the work action type + procFn procOneBatchFn // injected proc batch function + backoffer *Backoffer // Backoffer + tokenWaitDuration time.Duration // get token wait time } type procOneBatchFn func(bo *Backoffer, batch batchKeys) error @@ -1059,10 +1059,7 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, for idx, batch1 := range batches { waitStart := time.Now() if exit := batchExe.rateLimiter.getToken(exitCh); !exit { - tokenWaitDuration := time.Since(waitStart) - if tokenWaitDuration > batchExe.threshold { - metrics.TiKVTokenWaitDuration.Observe(float64(tokenWaitDuration)) - } + batchExe.tokenWaitDuration += time.Since(waitStart) batch := batch1 go func() { var singleBatchBackoffer *Backoffer @@ -1143,5 +1140,6 @@ func (batchExe *batchExecutor) process(batches []batchKeys) error { } } close(exitCh) + metrics.TiKVTokenWaitDuration.Observe(float64(batchExe.tokenWaitDuration)) return err }