Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add rateLimiter into txn commit batch actions #11817

Merged
merged 10 commits into from
Aug 28, 2019
142 changes: 90 additions & 52 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ type twoPhaseCommitter struct {
regionTxnSize map[uint64]int
}

// taskProcessor add rateLimiter to control concurrency of txn 2pc
type taskProcessor struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think batchExecutor is a better name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put more attributes in it, so we can reduce the argument count of process and startWorker.

rateLimiter *rateLimit
}

type procFunc func(bo *Backoffer, batch batchKeys) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already singleBatchActionFunc defined in doActionOnBatches. We can move it here.


type mutationEx struct {
pb.Mutation
asserted bool
Expand Down Expand Up @@ -405,58 +412,9 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
return errors.Trace(e)
}

// For prewrite, stop sending other requests after receiving first error.
backoffer := bo
var cancel context.CancelFunc
if action == actionPrewrite {
backoffer, cancel = bo.Fork()
defer cancel()
}

// Concurrently do the work for each batch.
ch := make(chan error, len(batches))
for _, batch1 := range batches {

batch := batch1
go func() {
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the background goroutines will be canceled execeptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer := backoffer.Clone()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
} else {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
}()
}
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
// Cancel other requests and return the first error.
if cancel != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches to cancel other actions",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Uint64("txnStartTS", c.startTS))
cancel()
}
if err == nil {
err = e
}
}
}
rateLim := len(batches) // this will be used for LargeTxn
processor := newTaskProcessor(rateLim)
err := processor.process(c, singleBatchActionFunc, bo, action, batches)
return errors.Trace(err)
}

Expand Down Expand Up @@ -1050,3 +1008,83 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn
}
return b
}

// newTaskProcessor create processor to handle concurrent batch works(prewrite/commit etc)
func newTaskProcessor(rateLimit int) *taskProcessor {
return &taskProcessor{newRateLimit(rateLimit)}
}

// startWork concurrently do the work for each batch considering rate limit
func (tp *taskProcessor) startWorker(exitCh chan struct{}, ch chan error,
procFn procFunc, action twoPhaseCommitAction,
backoffer *Backoffer, batches []batchKeys) {
for idx, batch1 := range batches {
if exit := tp.rateLimiter.getToken(exitCh); !exit {
batch := batch1
go func(i int) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
var procRes error
defer tp.rateLimiter.putToken()
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the background goroutines will be canceled execeptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer := backoffer.Clone()
procRes = procFn(singleBatchBackoffer, batch)
} else {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
defer singleBatchCancel()
procRes = procFn(singleBatchBackoffer, batch)
}
ch <- procRes
}(idx)
} else {
logutil.Logger(backoffer.ctx).Info("break startWorker", zap.Stringer("action", action))
break
}
}
}

// process will start worker routine and collect results
func (tp *taskProcessor) process(committer *twoPhaseCommitter, procFn procFunc,
bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error {
var err error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move all the logic in doActionOnBatches here.

c := committer
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
// For prewrite, stop sending other requests after receiving first error.
backoffer := bo
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
var cancel context.CancelFunc
if action == actionPrewrite {
backoffer, cancel = bo.Fork()
defer cancel()
}
// concurrently do the work for each batch.
ch := make(chan error, len(batches))
exitCh := make(chan struct{})
go tp.startWorker(exitCh, ch, procFn, action, backoffer, batches)
// check results
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
// Cancel other requests and return the first error.
if cancel != nil {
logutil.Logger(backoffer.ctx).Debug("2PC doActionOnBatches to cancel other actions",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Uint64("txnStartTS", c.startTS))
cancel()
}
if err == nil {
err = e
}
}
}
close(exitCh)
return err
}