Skip to content

Commit

Permalink
executor: make analyze killable by oom action (#39539)
Browse files Browse the repository at this point in the history
close #39559
  • Loading branch information
chrysan authored Dec 1, 2022
1 parent ff89ef2 commit 3f3e102
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 18 deletions.
7 changes: 7 additions & 0 deletions executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,13 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu
failpoint.Inject("mockAnalyzeSamplingMergeWorkerPanic", func() {
panic("failpoint triggered")
})
failpoint.Inject("mockAnalyzeMergeWorkerSlowConsume", func(val failpoint.Value) {
times := val.(int)
for i := 0; i < times; i++ {
e.memTracker.Consume(5 << 20)
time.Sleep(100 * time.Millisecond)
}
})
retCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
for i := 0; i < l; i++ {
retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(maxSketchSize))
Expand Down
11 changes: 9 additions & 2 deletions executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package executor
import (
"context"
"strconv"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/atomic"
)

Expand All @@ -45,8 +47,13 @@ func isAnalyzeWorkerPanic(err error) bool {
}

func getAnalyzePanicErr(r interface{}) error {
if msg, ok := r.(string); ok && msg == globalPanicAnalyzeMemoryExceed {
return errAnalyzeOOM
if msg, ok := r.(string); ok {
if msg == globalPanicAnalyzeMemoryExceed {
return errAnalyzeOOM
}
if strings.Contains(msg, memory.PanicMemoryExceed) {
return errors.Errorf(msg, errAnalyzeOOM)
}
}
if err, ok := r.(error); ok {
if err.Error() == globalPanicAnalyzeMemoryExceed {
Expand Down
26 changes: 13 additions & 13 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1978,19 +1978,19 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker)
} else {
sc.InitMemTracker(memory.LabelForSQLText, -1)
logOnQueryExceedMemQuota := domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota
switch variable.OOMAction.Load() {
case variable.OOMActionCancel:
action := &memory.PanicOnExceed{ConnID: vars.ConnectionID}
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
case variable.OOMActionLog:
fallthrough
default:
action := &memory.LogOnExceed{ConnID: vars.ConnectionID}
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
}
}
logOnQueryExceedMemQuota := domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota
switch variable.OOMAction.Load() {
case variable.OOMActionCancel:
action := &memory.PanicOnExceed{ConnID: vars.ConnectionID}
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
case variable.OOMActionLog:
fallthrough
default:
action := &memory.LogOnExceed{ConnID: vars.ConnectionID}
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
}
sc.MemTracker.SessionID = vars.ConnectionID
sc.MemTracker.AttachTo(vars.MemTracker)
Expand Down
32 changes: 32 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6187,6 +6187,38 @@ func TestGlobalMemoryControl2(t *testing.T) {
runtime.GC()
}

func TestGlobalMemoryControlForAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_server_memory_limit = 512MB")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk0.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tk0.MustExec("use test")
tk0.MustExec("create table t(a int)")
tk0.MustExec("insert into t select 1")
for i := 1; i <= 8; i++ {
tk0.MustExec("insert into t select * from t") // 256 Lines
}
sql := "analyze table t with 1.0 samplerate;" // Need about 100MB
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume"))
}()
_, err := tk0.Exec(sql)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
runtime.GC()
}

func TestCompileOutOfMemoryQuota(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
14 changes: 11 additions & 3 deletions util/memory/memstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"runtime"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
)

var stats atomic.Pointer[globalMstats]
Expand All @@ -26,12 +28,18 @@ var stats atomic.Pointer[globalMstats]
const ReadMemInterval = 300 * time.Millisecond

// ReadMemStats read the mem stats from runtime.ReadMemStats
func ReadMemStats() *runtime.MemStats {
func ReadMemStats() (memStats *runtime.MemStats) {
s := stats.Load()
if s != nil {
return &s.m
memStats = &s.m
} else {
memStats = ForceReadMemStats()
}
return ForceReadMemStats()
failpoint.Inject("ReadMemStats", func(val failpoint.Value) {
injectedSize := val.(int)
memStats.HeapInuse += uint64(injectedSize)
})
return
}

// ForceReadMemStats is to force read memory stats.
Expand Down

0 comments on commit 3f3e102

Please sign in to comment.