Skip to content

Commit

Permalink
planner: revise sync load error msg (#39198)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Nov 18, 2022
1 parent 50fdeef commit 48e17e2
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 43 deletions.
4 changes: 4 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.UseDynamicPruneMode = false
}

sc.StatsLoad.Timeout = 0
sc.StatsLoad.NeededItems = nil
sc.StatsLoad.ResultCh = nil

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow

vars.MemTracker.UnbindActions()
Expand Down
50 changes: 25 additions & 25 deletions planner/core/plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -66,7 +64,10 @@ func (syncWaitStatsLoadPoint) optimize(_ context.Context, plan LogicalPlan, _ *l
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, nil
}
_, err := SyncWaitStatsLoad(plan)
if plan.SCtx().GetSessionVars().StmtCtx.IsSyncStatsFailed {
return plan, nil
}
err := SyncWaitStatsLoad(plan)
return plan, err
}

Expand All @@ -91,37 +92,36 @@ func RequestLoadStats(ctx sessionctx.Context, neededHistItems []model.TableItemI
var timeout = time.Duration(waitTime)
err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout)
if err != nil {
logutil.BgLogger().Warn("SendLoadRequests failed", zap.Error(err))
stmtCtx.IsSyncStatsFailed = true
return handleTimeout(stmtCtx)
if variable.StatsLoadPseudoTimeout.Load() {
logutil.BgLogger().Warn("RequestLoadStats failed", zap.Error(err))
stmtCtx.AppendWarning(err)
return nil
}
logutil.BgLogger().Error("RequestLoadStats failed", zap.Error(err))
return err
}
return nil
}

// SyncWaitStatsLoad sync-wait for stats load until timeout
func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) {
func SyncWaitStatsLoad(plan LogicalPlan) error {
stmtCtx := plan.SCtx().GetSessionVars().StmtCtx
if stmtCtx.StatsLoad.Fallback {
return false, nil
}
success := domain.GetDomain(plan.SCtx()).StatsHandle().SyncWaitStatsLoad(stmtCtx)
if success {
return true, nil
}
logutil.BgLogger().Warn("SyncWaitStatsLoad failed")
stmtCtx.IsSyncStatsFailed = true
err := handleTimeout(stmtCtx)
return false, err
}

func handleTimeout(stmtCtx *stmtctx.StatementContext) error {
err := errors.New("Timeout when sync-load full stats for needed columns")
if variable.StatsLoadPseudoTimeout.Load() {
stmtCtx.AppendWarning(err)
stmtCtx.StatsLoad.Fallback = true
if len(stmtCtx.StatsLoad.NeededItems) <= 0 {
return nil
}
return err
err := domain.GetDomain(plan.SCtx()).StatsHandle().SyncWaitStatsLoad(stmtCtx)
if err != nil {
stmtCtx.IsSyncStatsFailed = true
if variable.StatsLoadPseudoTimeout.Load() {
logutil.BgLogger().Warn("SyncWaitStatsLoad failed", zap.Error(err))
stmtCtx.AppendWarning(err)
return nil
}
logutil.BgLogger().Error("SyncWaitStatsLoad failed", zap.Error(err))
return err
}
return nil
}

// collectSyncIndices will collect the indices which includes following conditions:
Expand Down
2 changes: 0 additions & 2 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ type StatementContext struct {
NeededItems []model.TableItemID
// ResultCh to receive stats loading results
ResultCh chan StatsLoadResult
// Fallback indicates if the planner uses full-loaded stats or fallback all to pseudo/simple.
Fallback bool
// LoadStartTime is to record the load start time to calculate latency
LoadStartTime time.Time
}
Expand Down
3 changes: 0 additions & 3 deletions statistics/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ func (c *Column) IsInvalid(sctx sessionctx.Context, collPseudo bool) bool {
}
if sctx != nil {
stmtctx := sctx.GetSessionVars().StmtCtx
if stmtctx != nil && stmtctx.StatsLoad.Fallback {
return true
}
if c.IsLoadNeeded() && stmtctx != nil {
if stmtctx.StatsLoad.Timeout > 0 {
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
Expand Down
36 changes: 25 additions & 11 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,33 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
tasks := make([]*NeededItemTask, 0)
for _, item := range remainedItems {
task := &NeededItemTask{
TableItemID: item,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
}
err := h.AppendNeededItem(task, timeout)
if err != nil {
return err
tasks = append(tasks, task)
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for _, task := range tasks {
select {
case h.StatsLoad.NeededItemsCh <- task:
continue
case <-timer.C:
return errors.New("sync load stats channel is full and timeout sending task to channel")
}
}
sc.StatsLoad.LoadStartTime = time.Now()
return nil
}

// SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout
func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
if len(sc.StatsLoad.NeededItems) <= 0 {
return true
return nil
}
var errorMsgs []string
defer func() {
Expand All @@ -120,15 +128,14 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
delete(resultCheckMap, result.Item)
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return true
return nil
}
} else {
return false
return errors.New("sync load stats channel closed unexpectedly")
}
case <-timer.C:
metrics.SyncLoadTimeoutCounter.Inc()
logutil.BgLogger().Warn("SyncWaitStatsLoad timeout")
return false
return errors.New("sync load stats timeout")
}
}
}
Expand All @@ -154,9 +161,16 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode
return remainedItems
}

// AppendNeededItem appends needed columns/indices to ch, if exists, do not append the duplicated one.
// AppendNeededItem appends needed columns/indices to ch, it is only used for test
func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error {
return h.writeToChanWithTimeout(h.StatsLoad.NeededItemsCh, task, timeout)
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case h.StatsLoad.NeededItemsCh <- task:
case <-timer.C:
return errors.New("Channel is full and timeout writing to channel")
}
return nil
}

var errExit = errors.New("Stop loading since domain is closed")
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestConcurrentLoadHist(t *testing.T) {
timeout := time.Nanosecond * mathutil.MaxInt
h.SendLoadRequests(stmtCtx, neededColumns, timeout)
rs := h.SyncWaitStatsLoad(stmtCtx)
require.True(t, rs)
require.Nil(t, rs)
stat = h.GetTableStats(tableInfo)
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
topn = stat.Columns[tableInfo.Columns[2].ID].TopN
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestConcurrentLoadHistTimeout(t *testing.T) {
}
h.SendLoadRequests(stmtCtx, neededColumns, 0) // set timeout to 0 so task will go to timeout channel
rs := h.SyncWaitStatsLoad(stmtCtx)
require.False(t, rs)
require.Error(t, rs)
stat = h.GetTableStats(tableInfo)
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
topn = stat.Columns[tableInfo.Columns[2].ID].TopN
Expand Down

0 comments on commit 48e17e2

Please sign in to comment.