Skip to content

Commit

Permalink
executor: support tidb memory debug mode (#35322)
Browse files Browse the repository at this point in the history
ref #33877
  • Loading branch information
wshwsh12 authored Jul 13, 2022
1 parent da1b82a commit bdc6397
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 0 deletions.
152 changes: 152 additions & 0 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,22 @@ package executor

import (
"context"
"os"
"path/filepath"
"runtime"
rpprof "runtime/pprof"
"strconv"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

// ExplainExec represents an explain executor.
Expand Down Expand Up @@ -89,6 +100,24 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
}
}
}()
if minHeapInUse, alarmRatio := e.ctx.GetSessionVars().MemoryDebugModeMinHeapInUse, e.ctx.GetSessionVars().MemoryDebugModeAlarmRatio; minHeapInUse != 0 && alarmRatio != 0 {
memoryDebugModeCtx, cancel := context.WithCancel(ctx)
waitGroup := sync.WaitGroup{}
waitGroup.Add(1)
defer func() {
// Notify and wait debug goroutine exit.
cancel()
waitGroup.Wait()
}()
go (&memoryDebugModeHandler{
ctx: memoryDebugModeCtx,
minHeapInUse: mathutil.Abs(minHeapInUse),
alarmRatio: alarmRatio,
autoGC: minHeapInUse > 0,
memTracker: e.ctx.GetSessionVars().StmtCtx.MemTracker,
wg: &waitGroup,
}).run()
}
e.executed = true
chk := newFirstChunk(e.analyzeExec)
for {
Expand Down Expand Up @@ -123,3 +152,126 @@ func (e *ExplainExec) getAnalyzeExecToExecutedNoDelay() Executor {
}
return nil
}

type memoryDebugModeHandler struct {
ctx context.Context
minHeapInUse int64
alarmRatio int64
autoGC bool
wg *sync.WaitGroup
memTracker *memory.Tracker

infoField []zap.Field
}

func (h *memoryDebugModeHandler) fetchCurrentMemoryUsage(gc bool) (heapInUse, trackedMem uint64) {
if gc {
runtime.GC()
}
instanceStats := &runtime.MemStats{}
runtime.ReadMemStats(instanceStats)
heapInUse = instanceStats.HeapInuse
trackedMem = uint64(h.memTracker.BytesConsumed())
return
}

func (h *memoryDebugModeHandler) genInfo(status string, needProfile bool, heapInUse, trackedMem int64) (fields []zap.Field, err error) {
var fileName string
h.infoField = h.infoField[:0]
h.infoField = append(h.infoField, zap.String("sql", status))
h.infoField = append(h.infoField, zap.String("heap in use", memory.FormatBytes(heapInUse)))
h.infoField = append(h.infoField, zap.String("tracked memory", memory.FormatBytes(trackedMem)))
if needProfile {
fileName, err = getHeapProfile()
h.infoField = append(h.infoField, zap.String("heap profile", fileName))
}
return h.infoField, err
}

func (h *memoryDebugModeHandler) run() {
var err error
var fields []zap.Field
defer func() {
heapInUse, trackedMem := h.fetchCurrentMemoryUsage(true)
if err == nil {
fields, err := h.genInfo("finished", true, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Info("Memory Debug Mode", fields...)
if err != nil {
logutil.BgLogger().Error("Memory Debug Mode Exit", zap.Error(err))
}
} else {
fields, err := h.genInfo("debug_mode_error", false, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Error("Memory Debug Mode", fields...)
logutil.BgLogger().Error("Memory Debug Mode Exit", zap.Error(err))
}
h.wg.Done()
}()

logutil.BgLogger().Info("Memory Debug Mode",
zap.String("sql", "started"),
zap.Bool("autoGC", h.autoGC),
zap.String("minHeapInUse", memory.FormatBytes(h.minHeapInUse)),
zap.Int64("alarmRatio", h.alarmRatio),
)
ticker, loop := time.NewTicker(5*time.Second), 0
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
heapInUse, trackedMem := h.fetchCurrentMemoryUsage(h.autoGC)
loop++
if loop%6 == 0 {
fields, err = h.genInfo("running", false, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Info("Memory Debug Mode", fields...)
if err != nil {
return
}
}

if !h.autoGC {
if heapInUse > uint64(h.minHeapInUse) && trackedMem/100*uint64(100+h.alarmRatio) < heapInUse {
fields, err = h.genInfo("warning", true, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Warn("Memory Debug Mode", fields...)
if err != nil {
return
}
}
} else {
if heapInUse > uint64(h.minHeapInUse) && trackedMem/100*uint64(100+h.alarmRatio) < heapInUse {
fields, err = h.genInfo("warning", true, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Warn("Memory Debug Mode", fields...)
if err != nil {
return
}
ts := h.memTracker.SearchTrackerConsumedMoreThanNBytes(h.minHeapInUse / 5)
logs := make([]zap.Field, 0, len(ts))
for _, t := range ts {
logs = append(logs, zap.String("Executor_"+strconv.Itoa(t.Label()), memory.FormatBytes(t.BytesConsumed())))
}
logutil.BgLogger().Warn("Memory Debug Mode, Log all trackers that consumes more than threshold * 20%", logs...)
}
}
}
}
}

func getHeapProfile() (fileName string, err error) {
tempDir := filepath.Join(config.GetGlobalConfig().TempStoragePath, "record")
timeString := time.Now().Format(time.RFC3339)
fileName = filepath.Join(tempDir, "heapGC"+timeString)
f, err := os.Create(fileName)
if err != nil {
return "", err
}
p := rpprof.Lookup("heap")
err = p.WriteTo(f, 0)
if err != nil {
return "", err
}
err = f.Close()
if err != nil {
return "", err
}
return fileName, nil
}
7 changes: 7 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,13 @@ func (e *Explain) RenderResult() error {
if e.Rows == nil || e.Analyze {
flat := FlattenPhysicalPlan(e.TargetPlan, true)
e.explainFlatPlanInRowFormat(flat)
if e.Analyze &&
e.SCtx().GetSessionVars().MemoryDebugModeMinHeapInUse != 0 &&
e.SCtx().GetSessionVars().MemoryDebugModeAlarmRatio > 0 {
row := e.Rows[0]
tracker := e.SCtx().GetSessionVars().StmtCtx.MemTracker
row[7] = row[7] + "(Total: " + tracker.FormatBytes(tracker.MaxConsumed()) + ")"
}
}
case types.ExplainFormatDOT:
if physicalPlan, ok := e.TargetPlan.(PhysicalPlan); ok {
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,12 @@ type SessionVars struct {

// RequestSourceType is the type of inner request.
RequestSourceType string

// MemoryDebugModeMinHeapInUse indicated the minimum heapInUse threshold that triggers the memoryDebugMode.
MemoryDebugModeMinHeapInUse int64
// MemoryDebugModeAlarmRatio indicated the allowable bias ratio of memory tracking accuracy check.
// When `(memory trakced by tidb) * (1+MemoryDebugModeAlarmRatio) < actual heapInUse`, an alarm log will be recorded.
MemoryDebugModeAlarmRatio int64
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,14 @@ var defaultSysVars = []*SysVar{
metrics.ToggleSimplifiedMode(TiDBOptOn(s))
return nil
}},
{Scope: ScopeSession, Name: TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0)
return nil
}},
{Scope: ScopeSession, Name: TiDBMemoryDebugModeAlarmRatio, Value: strconv.Itoa(0), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeAlarmRatio = TidbOptInt64(val, 0)
return nil
}},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,3 +1059,13 @@ func TestTiDBCommitterConcurrency(t *testing.T) {
require.Equal(t, val, fmt.Sprintf("%d", expected))
require.NoError(t, err)
}

func TestDefaultMemoryDebugModeValue(t *testing.T) {
vars := NewSessionVars()
val, err := GetSessionOrGlobalSystemVar(vars, TiDBMemoryDebugModeMinHeapInUse)
require.NoError(t, err)
require.Equal(t, val, "0")
val, err = GetSessionOrGlobalSystemVar(vars, TiDBMemoryDebugModeAlarmRatio)
require.NoError(t, err)
require.Equal(t, val, "0")
}
10 changes: 10 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,16 @@ const (

// TiDBSimplifiedMetrics controls whether to unregister some unused metrics.
TiDBSimplifiedMetrics = "tidb_simplified_metrics"

// TiDBMemoryDebugModeMinHeapInUse is used to set tidb memory debug mode trigger threshold.
// When set to 0, the function is disabled.
// When set to a negative integer, use memory debug mode to detect the issue of frequent allocation and release of memory.
// We do not actively trigger gc, and check whether the `tracker memory * (1+bias ratio) > heap in use` each 5s.
// When set to a positive integer, use memory debug mode to detect the issue of memory tracking inaccurate.
// We trigger runtime.GC() each 5s, and check whether the `tracker memory * (1+bias ratio) > heap in use`.
TiDBMemoryDebugModeMinHeapInUse = "tidb_memory_debug_mode_min_heap_inuse"
// TiDBMemoryDebugModeAlarmRatio is used set tidb memory debug mode bias ratio. Treat memory bias less than this ratio as noise.
TiDBMemoryDebugModeAlarmRatio = "tidb_memory_debug_mode_alarm_ratio"
)

// TiDB vars that have only global scope
Expand Down
14 changes: 14 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,20 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker {
return nil
}

// SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes.
func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) {
t.mu.Lock()
defer t.mu.Unlock()
for _, childSlice := range t.mu.children {
for _, tracker := range childSlice {
if tracker.BytesConsumed() > limit {
res = append(res, tracker)
}
}
}
return
}

// String returns the string representation of this Tracker tree.
func (t *Tracker) String() string {
buffer := bytes.NewBufferString("\n")
Expand Down

0 comments on commit bdc6397

Please sign in to comment.