Skip to content

Commit

Permalink
coprocessor: Exceed action for copiterator (pingcap#17324) (pingcap#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jul 15, 2020
1 parent b4da443 commit fe54331
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
9 changes: 9 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -139,6 +140,14 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
e.feedback.Invalidate()
return err
}

actionExceed := e.memTracker.GetActionOnExceed()
if actionExceed != nil {
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionExceed)
} else {
return errors.Trace(fmt.Errorf("failed to find actionExceed in TableReaderExecutor Open phase"))
}

if len(secondPartRanges) == 0 {
e.resultHandler.open(nil, firstResult)
return nil
Expand Down
84 changes: 84 additions & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
vars: vars,
memTracker: req.MemTracker,
replicaReadSeed: c.replicaReadSeed,
actionOnExceed: &EndCopWorkerAction{},
}
if it.memTracker != nil {
it.memTracker.FallbackOldAndSetNewAction(it.actionOnExceed)
}

it.minCommitTSPushed.data = make(map[uint64]struct{}, 5)
it.tasks = tasks
if it.concurrency > len(tasks) {
Expand All @@ -90,6 +95,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
} else {
it.respChan = make(chan *copResponse, it.concurrency)
}
it.actionOnExceed.mu.aliveWorker = it.concurrency
it.open(ctx)
return it
}
Expand Down Expand Up @@ -397,10 +403,13 @@ type copIterator struct {
closed uint32

minCommitTSPushed

actionOnExceed *EndCopWorkerAction
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
type copIteratorWorker struct {
id string
taskCh <-chan *copTask
wg *sync.WaitGroup
store *tikvStore
Expand All @@ -413,6 +422,8 @@ type copIteratorWorker struct {
memTracker *memory.Tracker

replicaReadSeed uint32

actionOnExceed *EndCopWorkerAction
}

// copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit.
Expand Down Expand Up @@ -484,7 +495,14 @@ const minLogCopTaskTime = 300 * time.Millisecond
// send the result back.
func (worker *copIteratorWorker) run(ctx context.Context) {
defer worker.wg.Done()

for task := range worker.taskCh {
endWorker, remainWorkers := worker.checkWorkerOOM()
if endWorker {
logutil.BgLogger().Info("end one copIterator worker.",
zap.String("copIteratorWorker id", worker.id), zap.Int("remain alive worker", remainWorkers))
return
}
respCh := worker.respChan
if respCh == nil {
respCh = task.respChan
Expand All @@ -497,19 +515,39 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
}
select {
case <-worker.finishCh:
worker.actionOnExceed.mu.Lock()
worker.actionOnExceed.mu.aliveWorker--
worker.actionOnExceed.mu.Unlock()
return
default:
}
}
}

func (worker *copIteratorWorker) checkWorkerOOM() (bool, int) {
endWorker := false
remainWorkers := 0
worker.actionOnExceed.mu.Lock()
defer worker.actionOnExceed.mu.Unlock()
if worker.actionOnExceed.mu.exceeded != 0 {
endWorker = true
worker.actionOnExceed.mu.aliveWorker--
remainWorkers = worker.actionOnExceed.mu.aliveWorker
// reset action
worker.actionOnExceed.mu.exceeded = 0
worker.actionOnExceed.once = sync.Once{}
}
return endWorker, remainWorkers
}

// open starts workers and sender goroutines.
func (it *copIterator) open(ctx context.Context) {
taskCh := make(chan *copTask, 1)
it.wg.Add(it.concurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency; i++ {
worker := &copIteratorWorker{
id: fmt.Sprintf("copIteratorWorker-%d", i),
taskCh: taskCh,
wg: &it.wg,
store: it.store,
Expand All @@ -527,6 +565,7 @@ func (it *copIterator) open(ctx context.Context) {
memTracker: it.memTracker,

replicaReadSeed: it.replicaReadSeed,
actionOnExceed: it.actionOnExceed,
}
go worker.run(ctx)
}
Expand Down Expand Up @@ -1150,3 +1189,48 @@ func (it copErrorResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
func (it copErrorResponse) Close() error {
return nil
}

// EndCopWorkerAction implements memory.ActionOnExceed for copIteratorWorker. If
// the memory quota of a query is exceeded, EndCopWorkAction.Action would end one copIteratorWorker.
// If there is only one or zero worker is running, delegate to the fallback action.
type EndCopWorkerAction struct {
once sync.Once
fallbackAction memory.ActionOnExceed
mu struct {
sync.Mutex
// exceeded indicates that datasource have exceeded memQuota.
exceeded uint32

// alive worker indicates how many copIteratorWorker are running
aliveWorker int
}
}

// Action sends a signal to trigger end one copIterator worker.
func (e *EndCopWorkerAction) Action(t *memory.Tracker) {
e.mu.Lock()
defer e.mu.Unlock()
// only one or zero worker is running, delegate to the fallback action
if e.mu.aliveWorker < 2 {
if e.fallbackAction != nil {
e.fallbackAction.Action(t)
}
return
}
// set exceeded as 1
e.once.Do(func() {
e.mu.exceeded = 1
logutil.BgLogger().Info("memory exceeds quota, mark EndCopWorkerAction exceed signal.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()), zap.Int64("maxConsumed", t.MaxConsumed()))
})
}

// SetLogHook implements ActionOnExceed.SetLogHook
func (e *EndCopWorkerAction) SetLogHook(hook func(uint64)) {

}

// SetFallback implements ActionOnExceed.SetFallback
func (e *EndCopWorkerAction) SetFallback(a memory.ActionOnExceed) {
e.fallbackAction = a
}
7 changes: 7 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func (t *Tracker) SetActionOnExceed(a ActionOnExceed) {
t.actionMu.Unlock()
}

// GetActionOnExceed return the actionOnExceed
func (t *Tracker) GetActionOnExceed() ActionOnExceed {
t.actionMu.Lock()
defer t.actionMu.Unlock()
return t.actionMu.actionOnExceed
}

// FallbackOldAndSetNewAction sets the action when memory usage exceeds bytesLimit
// and set the original action as its fallback.
func (t *Tracker) FallbackOldAndSetNewAction(a ActionOnExceed) {
Expand Down

0 comments on commit fe54331

Please sign in to comment.