diff --git a/statistics/update.go b/statistics/update.go index f96c38ca9d208..7bc77e80892b9 100644 --- a/statistics/update.go +++ b/statistics/update.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/sqlexec" log "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -284,80 +285,90 @@ func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error { if len(rows) == 0 || err != nil { return errors.Trace(err) } - tableID, histID, isIndex := int64(-1), int64(-1), int64(-1) - q := &QueryFeedback{} - var ( - cms *CMSketch - hist *Histogram - ) - for _, row := range rows { - // merge into previous feedback - if row.GetInt64(0) == tableID && row.GetInt64(1) == histID && row.GetInt64(2) == isIndex { - err = decodeFeedback(row.GetBytes(3), q, cms) - if err != nil { - log.Debugf("decode feedback failed, err: %v", errors.ErrorStack(err)) - } - continue + + var groupedRows [][]types.Row + preIdx := 0 + tableID, histID, isIndex := rows[0].GetInt64(0), rows[0].GetInt64(1), rows[0].GetInt64(2) + for i := 1; i < len(rows); i++ { + row := rows[i] + if row.GetInt64(0) != tableID || row.GetInt64(1) != histID || row.GetInt64(2) != isIndex { + groupedRows = append(groupedRows, rows[preIdx:i]) + tableID, histID, isIndex = row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) + preIdx = i } - // dump the stats into kv - if hist != nil { - err = h.dumpStatsUpdateToKV(tableID, int(isIndex), q, hist, cms) - if err != nil { - return errors.Trace(err) - } + } + groupedRows = append(groupedRows, rows[preIdx:]) + + for _, rows := range groupedRows { + if err := h.handleSingleHistogramUpdate(is, rows); err != nil { + return errors.Trace(err) } - // initialize new feedback - tableID, histID, isIndex = row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) - table, ok := is.TableByID(tableID) - if !ok { - hist, cms = nil, nil - continue + } + return nil +} + +// handleSingleHistogramUpdate updates the Histogram and CM Sketch using these feedbacks. All the feedbacks for +// the same index or column are gathered in `rows`. +func (h *Handle) handleSingleHistogramUpdate(is infoschema.InfoSchema, rows []types.Row) (err error) { + tableID, histID, isIndex := rows[0].GetInt64(0), rows[0].GetInt64(1), rows[0].GetInt64(2) + defer func() { + if err == nil { + err = errors.Trace(h.deleteOutdatedFeedback(tableID, histID, isIndex)) } - tbl := h.GetTableStats(table.Meta()) - if isIndex == 1 { - idx, ok := tbl.Indices[histID] - if !ok { - hist, cms = nil, nil - continue - } - hist = &idx.Histogram + }() + table, ok := is.TableByID(tableID) + // The table has been deleted. + if !ok { + return nil + } + tbl := h.GetTableStats(table.Meta()) + var cms *CMSketch + var hist *Histogram + if isIndex == 1 { + idx, ok := tbl.Indices[histID] + if ok { + idxHist := idx.Histogram + hist = &idxHist cms = idx.CMSketch.copy() - } else { - col, ok := tbl.Columns[histID] - if !ok { - hist, cms = nil, nil - continue - } - hist = &col.Histogram - cms = nil } - err = decodeFeedback(row.GetBytes(3), q, cms) - if err != nil { + } else { + col, ok := tbl.Columns[histID] + if ok { + colHist := col.Histogram + hist = &colHist + } + } + // The column or index has been deleted. + if hist == nil { + return nil + } + q := &QueryFeedback{} + for _, row := range rows { + err1 := decodeFeedback(row.GetBytes(3), q, cms) + if err1 != nil { log.Debugf("decode feedback failed, err: %v", errors.ErrorStack(err)) } } - // dump the last feedback into kv - err = h.dumpStatsUpdateToKV(tableID, int(isIndex), q, hist, cms) + // Update the NDV of primary key column. + if table.Meta().PKIsHandle && isIndex == 0 { + hist.NDV = int64(hist.totalRowCount()) + } + err = h.dumpStatsUpdateToKV(tableID, isIndex, q, hist, cms) return errors.Trace(err) } -func (h *Handle) dumpStatsUpdateToKV(tableID int64, isIndex int, q *QueryFeedback, hist *Histogram, cms *CMSketch) (err error) { - defer func() { - if err != nil { - metrics.UpdateStatsCounter.WithLabelValues(metrics.LblError).Inc() - } else { - metrics.UpdateStatsCounter.WithLabelValues(metrics.LblOK).Inc() - } - }() - hist = UpdateHistogram(hist, q) - err = SaveStatsToStorage(h.ctx, tableID, -1, isIndex, hist, cms) - if err != nil { - return errors.Trace(err) - } +func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error { h.ctx.GetSessionVars().BatchDelete = true - sql := fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d and hist_id = %d and is_index = %d", tableID, hist.ID, isIndex) - _, err = h.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - q.feedback = q.feedback[:0] + sql := fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d and hist_id = %d and is_index = %d", tableID, histID, isIndex) + _, err := h.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + h.ctx.GetSessionVars().BatchDelete = false + return errors.Trace(err) +} + +func (h *Handle) dumpStatsUpdateToKV(tableID, isIndex int64, q *QueryFeedback, hist *Histogram, cms *CMSketch) error { + hist = UpdateHistogram(hist, q) + err := SaveStatsToStorage(h.ctx, tableID, -1, int(isIndex), hist, cms) + metrics.UpdateStatsCounter.WithLabelValues(metrics.RetLabel(err)).Inc() return errors.Trace(err) } diff --git a/statistics/update_test.go b/statistics/update_test.go index b59ea232d937d..0318e8c9c560b 100644 --- a/statistics/update_test.go +++ b/statistics/update_test.go @@ -542,6 +542,16 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { feedback := h.GetQueryFeedback() c.Assert(len(feedback), Equals, 0) } + + // Test that the outdated feedback won't cause panic. + statistics.FeedbackProbability = 1 + for _, t := range tests { + testKit.MustQuery(t.sql) + } + c.Assert(h.DumpStatsDeltaToKV(), IsNil) + c.Assert(h.DumpStatsFeedbackToKV(), IsNil) + testKit.MustExec("drop table t") + c.Assert(h.HandleUpdateStats(s.do.InfoSchema()), IsNil) } func (s *testStatsUpdateSuite) TestUpdateSystemTable(c *C) {