Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add rateLimiter into txn commit batch actions #11817

Merged
merged 10 commits into from
Aug 28, 2019
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,5 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVBatchClientUnavailable)
prometheus.MustRegister(TiKVRangeTaskStats)
prometheus.MustRegister(TiKVRangeTaskPushDuration)
prometheus.MustRegister(TiKVTokenWaitDuration)
}
8 changes: 8 additions & 0 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,12 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
Help: "duration to push sub tasks to range task workers",
}, []string{LblType})
TiKVTokenWaitDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "batch_executor_token_wait_duration",
Buckets: prometheus.ExponentialBuckets(1, 2, 30), // 1ns ~ 1s
Help: "tidb txn token wait duration to process batches",
})
)
218 changes: 142 additions & 76 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ type twoPhaseCommitter struct {
regionTxnSize map[uint64]int
}

// batchExecutor is txn controller providing rate control like utils
type batchExecutor struct {
rateLim int // concurrent worker numbers
rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies
committer *twoPhaseCommitter // here maybe more different type committer in the future
action twoPhaseCommitAction // the work action type
procFn procOneBatchFn // injected proc batch function
backoffer *Backoffer // Backoffer
tokenWaitDuration time.Duration // get token wait time
}

type procOneBatchFn func(bo *Backoffer, batch batchKeys) error

type mutationEx struct {
pb.Mutation
asserted bool
Expand Down Expand Up @@ -377,22 +390,13 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error {
singleBatchActionFunc, err := c.getProcFuncByType(action)
if err != nil {
return err
}
if len(batches) == 0 {
return nil
}
var singleBatchActionFunc func(bo *Backoffer, batch batchKeys) error
switch action {
case actionPrewrite:
singleBatchActionFunc = c.prewriteSingleBatch
case actionCommit:
singleBatchActionFunc = c.commitSingleBatch
case actionCleanup:
singleBatchActionFunc = c.cleanupSingleBatch
case actionPessimisticLock:
singleBatchActionFunc = c.pessimisticLockSingleBatch
case actionPessimisticRollback:
singleBatchActionFunc = c.pessimisticRollbackSingleBatch
}
if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
if e != nil {
Expand All @@ -404,71 +408,29 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
}
return errors.Trace(e)
}
rateLim := len(batches) // this will be used for LargeTxn, set rateLim here
batchExecutor := newBatchExecutor(rateLim, c, action, singleBatchActionFunc, bo)
err = batchExecutor.process(batches)
return errors.Trace(err)
}

// For prewrite, stop sending other requests after receiving first error.
backoffer := bo
var cancel context.CancelFunc
if action == actionPrewrite {
backoffer, cancel = bo.Fork()
defer cancel()
}

// Concurrently do the work for each batch.
ch := make(chan error, len(batches))
for _, batch1 := range batches {

batch := batch1
go func() {
var singleBatchBackoffer *Backoffer
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the background goroutines will be canceled execeptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer = backoffer.Clone()
} else {
var singleBatchCancel context.CancelFunc
singleBatchBackoffer, singleBatchCancel = backoffer.Fork()
defer singleBatchCancel()
}
beforeSleep := singleBatchBackoffer.totalSleep
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
commitDetail := c.getDetail()
if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil
if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 {
atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(singleBatchBackoffer.totalSleep-beforeSleep)*int64(time.Millisecond))
commitDetail.Mu.Lock()
commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, singleBatchBackoffer.types...)
commitDetail.Mu.Unlock()
}
}
}()
}
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
// Cancel other requests and return the first error.
if cancel != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches to cancel other actions",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Uint64("txnStartTS", c.startTS))
cancel()
}
if err == nil {
err = e
}
}
func (c *twoPhaseCommitter) getProcFuncByType(action twoPhaseCommitAction) (procOneBatchFn, error) {
var singleBatchActionFunc procOneBatchFn
switch action {
case actionPrewrite:
singleBatchActionFunc = c.prewriteSingleBatch
case actionCommit:
singleBatchActionFunc = c.commitSingleBatch
case actionCleanup:
singleBatchActionFunc = c.cleanupSingleBatch
case actionPessimisticLock:
singleBatchActionFunc = c.pessimisticLockSingleBatch
case actionPessimisticRollback:
singleBatchActionFunc = c.pessimisticRollbackSingleBatch
default:
return nil, errors.Errorf("invalid action type=%v", action)
}
return errors.Trace(err)
return singleBatchActionFunc, nil
}

func (c *twoPhaseCommitter) keyValueSize(key []byte) int {
Expand Down Expand Up @@ -1077,3 +1039,107 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn
}
return b
}

// newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc)
func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter,
action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor {
return &batchExecutor{rateLimit, nil, committer,
action, procFn, backoffer, time.Duration(1 * time.Millisecond)}
}

// initUtils do initialize batchExecutor related policies like rateLimit util
func (batchExe *batchExecutor) initUtils() error {
// init rateLimiter by injected rate limit number
batchExe.rateLimiter = newRateLimit(batchExe.rateLim)
return nil
}

// startWork concurrently do the work for each batch considering rate limit
func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, batches []batchKeys) {
for idx, batch1 := range batches {
waitStart := time.Now()
if exit := batchExe.rateLimiter.getToken(exitCh); !exit {
lysu marked this conversation as resolved.
Show resolved Hide resolved
batchExe.tokenWaitDuration += time.Since(waitStart)
batch := batch1
go func() {
var singleBatchBackoffer *Backoffer
if batchExe.action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the background goroutines will be canceled execeptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer = batchExe.backoffer.Clone()
} else {
var singleBatchCancel context.CancelFunc
singleBatchBackoffer, singleBatchCancel = batchExe.backoffer.Fork()
defer singleBatchCancel()
}
beforeSleep := singleBatchBackoffer.totalSleep
ch <- batchExe.procFn(singleBatchBackoffer, batch)
commitDetail := batchExe.committer.getDetail()
if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil
if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 {
atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(singleBatchBackoffer.totalSleep-beforeSleep)*int64(time.Millisecond))
commitDetail.Mu.Lock()
commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, singleBatchBackoffer.types...)
commitDetail.Mu.Unlock()
}
}
}()
} else {
logutil.Logger(batchExe.backoffer.ctx).Info("break startWorker",
zap.Stringer("action", batchExe.action), zap.Int("batch size", len(batches)),
zap.Int("index", idx))
break
}
}
}

// process will start worker routine and collect results
func (batchExe *batchExecutor) process(batches []batchKeys) error {
var err error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move all the logic in doActionOnBatches here.

err = batchExe.initUtils()
if err != nil {
logutil.Logger(batchExe.backoffer.ctx).Error("batchExecutor initUtils failed", zap.Error(err))
return err
}

// For prewrite, stop sending other requests after receiving first error.
backoffer := batchExe.backoffer
var cancel context.CancelFunc
if batchExe.action == actionPrewrite {
backoffer, cancel = batchExe.backoffer.Fork()
defer cancel()
}
// concurrently do the work for each batch.
ch := make(chan error, len(batches))
exitCh := make(chan struct{})
go batchExe.startWorker(exitCh, ch, batches)
// check results
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", batchExe.committer.connID),
zap.Stringer("action type", batchExe.action),
zap.Error(e),
zap.Uint64("txnStartTS", batchExe.committer.startTS))
// Cancel other requests and return the first error.
if cancel != nil {
logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches to cancel other actions",
zap.Uint64("conn", batchExe.committer.connID),
zap.Stringer("action type", batchExe.action),
zap.Uint64("txnStartTS", batchExe.committer.startTS))
cancel()
}
if err == nil {
err = e
}
}
}
close(exitCh)
metrics.TiKVTokenWaitDuration.Observe(float64(batchExe.tokenWaitDuration))
return err
}